Skip to content

Lockdown services

Every service takes a service provider and is used as an async context manager. They all derive from LockdownService.

Base class

pymobiledevice3.services.lockdown_service.LockdownService

Base class for all services that wrap a single lockdown service on the device.

A subclass binds to a named lockdown service (service_name) and is constructed with a LockdownServiceProvider (a lockdown client or an RSD/tunnel provider) that knows how to reach the device. The underlying service connection is established lazily: it is opened on first use, on an explicit connect call, or on entering the async context manager, and is closed on close or on exit.

Instances are intended to be used as async context managers::

async with SomeService(lockdown) as service:
    ...

Attributes:

Name Type Description
service_name str

name of the wrapped lockdown service.

lockdown LockdownServiceProvider

service provider used to start the service and reach the device.

logger Logger

logger named after the subclass module.

Source code in pymobiledevice3/services/lockdown_service.py
class LockdownService:
    """
    Base class for all services that wrap a single lockdown service on the device.

    A subclass binds to a named lockdown service (``service_name``) and is constructed with a
    `LockdownServiceProvider` (a lockdown client
    or an RSD/tunnel provider) that knows how to reach the device. The underlying service
    connection is established lazily: it is opened on first use, on an explicit `connect`
    call, or on entering the async context manager, and is closed on `close` or on exit.

    Instances are intended to be used as async context managers::

        async with SomeService(lockdown) as service:
            ...

    :ivar service_name: name of the wrapped lockdown service.
    :ivar lockdown: service provider used to start the service and reach the device.
    :ivar logger: logger named after the subclass module.
    """

    def __init__(
        self,
        lockdown: LockdownServiceProvider,
        service_name: str,
        is_developer_service: bool = False,
        service: Optional[ServiceConnection] = None,
        include_escrow_bag: bool = False,
    ) -> None:
        """
        :param lockdown: service provider used to start the service and communicate with the device.
        :param service_name: name of the lockdown service to wrap; started lazily on first connection.
        :param is_developer_service: when True, the service is started via the developer-service path,
            which requires the DeveloperDiskImage to be mounted.
        :param service: an already-established service connection. When provided, no connection is
            started; otherwise a connection to ``service_name`` is opened lazily.
        :param include_escrow_bag: when True, include the host escrow bag when starting the service.
        """
        self._is_developer_service = is_developer_service
        self._include_escrow_bag = include_escrow_bag
        self.service_name: str = service_name
        self.lockdown: LockdownServiceProvider = lockdown
        self._service: Optional[ServiceConnection] = service
        self._service_proxy = _LazyServiceConnection(self)
        self.logger: logging.Logger = logging.getLogger(self.__module__)
        # Shared Future: the first connect() call creates it; concurrent callers join it
        # instead of racing to call start_lockdown_service() multiple times.
        self._connect_future: Optional[asyncio.Future] = None

    @property
    def service(self) -> Union[ServiceConnection, _LazyServiceConnection]:
        """
        The wrapped service connection.

        :returns: the established `ServiceConnection`
            if already connected, otherwise a lazy proxy that connects on first awaited use.
        """
        if self._service is not None:
            return self._service
        return self._service_proxy

    async def __aenter__(self) -> Self:
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    async def close(self) -> None:
        """Close the underlying service connection (if any) and reset connection state."""
        if self._service is not None:
            await self._service.close()
            self._service = None
        self._connect_future = None

    async def connect(self) -> None:
        """
        Start and connect the wrapped service if not already connected.

        Does nothing when a connection already exists. Concurrent callers share a single
        in-flight connection attempt rather than starting the service multiple times; on
        failure the attempt is reset so it may be retried.

        :raises StartServiceError: if the service fails to start.
        """
        if self._service is not None:
            return
        if self._connect_future is not None:
            return await self._connect_future
        fut = asyncio.get_running_loop().create_future()
        self._connect_future = fut
        start_service = (
            self.lockdown.start_lockdown_developer_service
            if self._is_developer_service
            else self.lockdown.start_lockdown_service
        )
        try:
            self._service = await start_service(self.service_name, include_escrow_bag=self._include_escrow_bag)
            fut.set_result(None)
        except BaseException as e:
            self._connect_future = None  # allow retry
            if isinstance(e, asyncio.CancelledError):
                fut.cancel()
            else:
                fut.set_exception(e)
            raise

service property

service: Union[ServiceConnection, _LazyServiceConnection]

The wrapped service connection.

Returns:

Type Description
Union[ServiceConnection, _LazyServiceConnection]

the established ServiceConnection if already connected, otherwise a lazy proxy that connects on first awaited use.

close async

close() -> None

Close the underlying service connection (if any) and reset connection state.

Source code in pymobiledevice3/services/lockdown_service.py
async def close(self) -> None:
    """Close the underlying service connection (if any) and reset connection state."""
    if self._service is not None:
        await self._service.close()
        self._service = None
    self._connect_future = None

connect async

connect() -> None

Start and connect the wrapped service if not already connected.

Does nothing when a connection already exists. Concurrent callers share a single in-flight connection attempt rather than starting the service multiple times; on failure the attempt is reset so it may be retried.

Raises:

Type Description
StartServiceError

if the service fails to start.

Source code in pymobiledevice3/services/lockdown_service.py
async def connect(self) -> None:
    """
    Start and connect the wrapped service if not already connected.

    Does nothing when a connection already exists. Concurrent callers share a single
    in-flight connection attempt rather than starting the service multiple times; on
    failure the attempt is reset so it may be retried.

    :raises StartServiceError: if the service fails to start.
    """
    if self._service is not None:
        return
    if self._connect_future is not None:
        return await self._connect_future
    fut = asyncio.get_running_loop().create_future()
    self._connect_future = fut
    start_service = (
        self.lockdown.start_lockdown_developer_service
        if self._is_developer_service
        else self.lockdown.start_lockdown_service
    )
    try:
        self._service = await start_service(self.service_name, include_escrow_bag=self._include_escrow_bag)
        fut.set_result(None)
    except BaseException as e:
        self._connect_future = None  # allow retry
        if isinstance(e, asyncio.CancelledError):
            fut.cancel()
        else:
            fut.set_exception(e)
        raise

Common services

pymobiledevice3.services.os_trace.OsTraceService

Bases: LockdownService

Client for the device's com.apple.os_trace_relay service.

Provides API for the following operations:

  • List the running processes (pid to process name mapping).
  • Stream live syslog entries in binary form, optionally filtered by pid.
  • Retrieve the device's stored log archive (the contents of the /var/db/diagnostics directory), either as a raw PAX-format tar stream or extracted into a .logarchive directory consumable by log / Console.

The service is reached over the classic lockdown service on SERVICE_NAME, or over the RemoteServiceDiscovery shim (RSD_SERVICE_NAME) when a non-LockdownClient provider is supplied.

Source code in pymobiledevice3/services/os_trace.py
class OsTraceService(LockdownService):
    """
    Client for the device's ``com.apple.os_trace_relay`` service.

    Provides API for the following operations:

    * List the running processes (pid to process name mapping).
    * Stream live syslog entries in binary form, optionally filtered by pid.
    * Retrieve the device's stored log archive (the contents of the ``/var/db/diagnostics``
      directory), either as a raw PAX-format tar stream or extracted into a ``.logarchive``
      directory consumable by ``log`` / Console.

    The service is reached over the classic lockdown service on `SERVICE_NAME`, or over the
    RemoteServiceDiscovery shim (`RSD_SERVICE_NAME`) when a non-`LockdownClient`
    provider is supplied.
    """

    SERVICE_NAME = "com.apple.os_trace_relay"
    RSD_SERVICE_NAME = "com.apple.os_trace_relay.shim.remote"

    def __init__(self, lockdown: LockdownServiceProvider):
        if isinstance(lockdown, LockdownClient):
            super().__init__(lockdown, self.SERVICE_NAME)
        else:
            super().__init__(lockdown, self.RSD_SERVICE_NAME)

    async def get_pid_list(self):
        """
        Request the device's process list.

        :returns: The decoded ``PidList`` response plist. Its ``Payload`` entry maps each pid to a
            dict of per-process metadata (such as ``ProcessName``).
        """
        await self.connect()
        assert self.service is not None
        await self.service.send_plist({"Request": "PidList"})

        # ignore first received unknown byte
        await self.service.recvall(1)

        response = await self.service.recv_prefixed()
        return plistlib.loads(response)

    async def create_archive(
        self,
        out: typing.IO,
        size_limit: typing.Optional[int] = None,
        age_limit: typing.Optional[int] = None,
        start_time: typing.Optional[int] = None,
    ):
        """
        Request a log archive from the device and stream its raw bytes into ``out``.

        The device returns the archive as a PAX-format tar stream (the contents of
        ``/var/db/diagnostics``), which is written to ``out`` in chunks until the connection is
        terminated. Use `collect` instead to obtain an extracted ``.logarchive`` directory.

        :param out: A writable binary file-like object to receive the archive bytes.
        :param size_limit: Optional maximum archive size in bytes.
        :param age_limit: Optional maximum age, in days, of entries to include.
        :param start_time: Optional earliest entry time, as a unix timestamp.
        :raises AssertionError: If the device does not acknowledge the request with a successful status.
        """
        request = {"Request": "CreateArchive"}

        if size_limit is not None:
            request.update({"SizeLimit": size_limit})

        if age_limit is not None:
            request.update({"AgeLimit": age_limit})

        if start_time is not None:
            request.update({"StartTime": start_time})

        await self.connect()
        assert self.service is not None
        await self.service.send_plist(request)

        assert (await self.service.recvall(1))[0] == 1

        assert plistlib.loads(await self.service.recv_prefixed()).get("Status") == "RequestSuccessful", "Invalid status"

        while True:
            try:
                assert (await self.service.recvall(1))[0] == 3, "invalid magic"
            except ConnectionTerminatedError:
                break
            out.write(await self.service.recv_prefixed(endianity="<"))

    async def collect(
        self,
        out: str,
        size_limit: typing.Optional[int] = None,
        age_limit: typing.Optional[int] = None,
        start_time: typing.Optional[int] = None,
    ) -> None:
        """
        Collect the device's system logs into a ``.logarchive`` that can be inspected later with
        tools such as ``log`` or Console.

        Internally calls `create_archive` to fetch the tar stream into a temporary file, then
        extracts it into the ``out`` directory.

        :param out: Destination directory path into which the archive is extracted.
        :param size_limit: Optional maximum archive size in bytes.
        :param age_limit: Optional maximum age, in days, of entries to include.
        :param start_time: Optional earliest entry time, as a unix timestamp.
        """
        with tempfile.TemporaryDirectory() as temp_dir:
            file = Path(temp_dir) / "foo.tar"
            with open(file, "wb") as f:
                await self.create_archive(f, size_limit=size_limit, age_limit=age_limit, start_time=start_time)
            TarFile(file).extractall(out)

    async def syslog(
        self,
        pid: int = -1,
        message_filter: int = OS_TRACE_RELAY_MESSAGE_FILTER_ALL,
        stream_flags: int = OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT,
    ) -> typing.AsyncGenerator[SyslogEntry, None]:
        """
        Stream live syslog entries from the device.

        Starts an activity stream on the device and yields each incoming record, parsed via
        `parse_syslog_entry`, indefinitely until the caller stops iterating or the connection
        drops.

        :param pid: Restrict the stream to a single process by pid; ``-1`` (the default) streams
            entries from all processes.
        :param message_filter: Bitmask selecting which message levels to receive; defaults to
            `OS_TRACE_RELAY_MESSAGE_FILTER_ALL` (all levels).
        :param stream_flags: Bitmask of `OsActivityStreamFlag` values controlling stream
            contents and decoding; defaults to `OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT`.
        :yields: Each decoded log entry as it arrives.
        :raises PyMobileDevice3Exception: If the device rejects the stream-start request.
        """
        await self.connect()
        assert self.service is not None
        await self.service.send_plist({
            "Request": "StartActivity",
            "MessageFilter": message_filter,
            "Pid": pid,
            "StreamFlags": stream_flags,
        })

        (length_length,) = struct.unpack("<I", await self.service.recvall(4))
        length = int((await self.service.recvall(length_length))[::-1].hex(), 16)
        response = plistlib.loads(await self.service.recvall(length))

        if response.get("Status") != "RequestSuccessful":
            raise PyMobileDevice3Exception(f"got invalid response: {response}")

        while True:
            assert await self.service.recvall(1) == b"\x02"
            (length,) = struct.unpack("<I", await self.service.recvall(4))
            line = await self.service.recvall(length)
            yield parse_syslog_entry(line)

get_pid_list async

get_pid_list()

Request the device's process list.

Returns:

Type Description

The decoded PidList response plist. Its Payload entry maps each pid to a dict of per-process metadata (such as ProcessName).

Source code in pymobiledevice3/services/os_trace.py
async def get_pid_list(self):
    """
    Request the device's process list.

    :returns: The decoded ``PidList`` response plist. Its ``Payload`` entry maps each pid to a
        dict of per-process metadata (such as ``ProcessName``).
    """
    await self.connect()
    assert self.service is not None
    await self.service.send_plist({"Request": "PidList"})

    # ignore first received unknown byte
    await self.service.recvall(1)

    response = await self.service.recv_prefixed()
    return plistlib.loads(response)

create_archive async

create_archive(out: IO, size_limit: Optional[int] = None, age_limit: Optional[int] = None, start_time: Optional[int] = None)

Request a log archive from the device and stream its raw bytes into out.

The device returns the archive as a PAX-format tar stream (the contents of /var/db/diagnostics), which is written to out in chunks until the connection is terminated. Use collect instead to obtain an extracted .logarchive directory.

Parameters:

Name Type Description Default
out IO

A writable binary file-like object to receive the archive bytes.

required
size_limit Optional[int]

Optional maximum archive size in bytes.

None
age_limit Optional[int]

Optional maximum age, in days, of entries to include.

None
start_time Optional[int]

Optional earliest entry time, as a unix timestamp.

None

Raises:

Type Description
AssertionError

If the device does not acknowledge the request with a successful status.

Source code in pymobiledevice3/services/os_trace.py
async def create_archive(
    self,
    out: typing.IO,
    size_limit: typing.Optional[int] = None,
    age_limit: typing.Optional[int] = None,
    start_time: typing.Optional[int] = None,
):
    """
    Request a log archive from the device and stream its raw bytes into ``out``.

    The device returns the archive as a PAX-format tar stream (the contents of
    ``/var/db/diagnostics``), which is written to ``out`` in chunks until the connection is
    terminated. Use `collect` instead to obtain an extracted ``.logarchive`` directory.

    :param out: A writable binary file-like object to receive the archive bytes.
    :param size_limit: Optional maximum archive size in bytes.
    :param age_limit: Optional maximum age, in days, of entries to include.
    :param start_time: Optional earliest entry time, as a unix timestamp.
    :raises AssertionError: If the device does not acknowledge the request with a successful status.
    """
    request = {"Request": "CreateArchive"}

    if size_limit is not None:
        request.update({"SizeLimit": size_limit})

    if age_limit is not None:
        request.update({"AgeLimit": age_limit})

    if start_time is not None:
        request.update({"StartTime": start_time})

    await self.connect()
    assert self.service is not None
    await self.service.send_plist(request)

    assert (await self.service.recvall(1))[0] == 1

    assert plistlib.loads(await self.service.recv_prefixed()).get("Status") == "RequestSuccessful", "Invalid status"

    while True:
        try:
            assert (await self.service.recvall(1))[0] == 3, "invalid magic"
        except ConnectionTerminatedError:
            break
        out.write(await self.service.recv_prefixed(endianity="<"))

collect async

collect(out: str, size_limit: Optional[int] = None, age_limit: Optional[int] = None, start_time: Optional[int] = None) -> None

Collect the device's system logs into a .logarchive that can be inspected later with tools such as log or Console.

Internally calls create_archive to fetch the tar stream into a temporary file, then extracts it into the out directory.

Parameters:

Name Type Description Default
out str

Destination directory path into which the archive is extracted.

required
size_limit Optional[int]

Optional maximum archive size in bytes.

None
age_limit Optional[int]

Optional maximum age, in days, of entries to include.

None
start_time Optional[int]

Optional earliest entry time, as a unix timestamp.

None
Source code in pymobiledevice3/services/os_trace.py
async def collect(
    self,
    out: str,
    size_limit: typing.Optional[int] = None,
    age_limit: typing.Optional[int] = None,
    start_time: typing.Optional[int] = None,
) -> None:
    """
    Collect the device's system logs into a ``.logarchive`` that can be inspected later with
    tools such as ``log`` or Console.

    Internally calls `create_archive` to fetch the tar stream into a temporary file, then
    extracts it into the ``out`` directory.

    :param out: Destination directory path into which the archive is extracted.
    :param size_limit: Optional maximum archive size in bytes.
    :param age_limit: Optional maximum age, in days, of entries to include.
    :param start_time: Optional earliest entry time, as a unix timestamp.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        file = Path(temp_dir) / "foo.tar"
        with open(file, "wb") as f:
            await self.create_archive(f, size_limit=size_limit, age_limit=age_limit, start_time=start_time)
        TarFile(file).extractall(out)

syslog async

syslog(pid: int = -1, message_filter: int = OS_TRACE_RELAY_MESSAGE_FILTER_ALL, stream_flags: int = OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT) -> typing.AsyncGenerator[SyslogEntry, None]

Stream live syslog entries from the device.

Starts an activity stream on the device and yields each incoming record, parsed via parse_syslog_entry, indefinitely until the caller stops iterating or the connection drops.

:yields: Each decoded log entry as it arrives.

Parameters:

Name Type Description Default
pid int

Restrict the stream to a single process by pid; -1 (the default) streams entries from all processes.

-1
message_filter int

Bitmask selecting which message levels to receive; defaults to OS_TRACE_RELAY_MESSAGE_FILTER_ALL (all levels).

OS_TRACE_RELAY_MESSAGE_FILTER_ALL
stream_flags int

Bitmask of OsActivityStreamFlag values controlling stream contents and decoding; defaults to OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT.

OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT

Raises:

Type Description
PyMobileDevice3Exception

If the device rejects the stream-start request.

Source code in pymobiledevice3/services/os_trace.py
async def syslog(
    self,
    pid: int = -1,
    message_filter: int = OS_TRACE_RELAY_MESSAGE_FILTER_ALL,
    stream_flags: int = OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT,
) -> typing.AsyncGenerator[SyslogEntry, None]:
    """
    Stream live syslog entries from the device.

    Starts an activity stream on the device and yields each incoming record, parsed via
    `parse_syslog_entry`, indefinitely until the caller stops iterating or the connection
    drops.

    :param pid: Restrict the stream to a single process by pid; ``-1`` (the default) streams
        entries from all processes.
    :param message_filter: Bitmask selecting which message levels to receive; defaults to
        `OS_TRACE_RELAY_MESSAGE_FILTER_ALL` (all levels).
    :param stream_flags: Bitmask of `OsActivityStreamFlag` values controlling stream
        contents and decoding; defaults to `OS_TRACE_RELAY_STREAM_FLAGS_DEFAULT`.
    :yields: Each decoded log entry as it arrives.
    :raises PyMobileDevice3Exception: If the device rejects the stream-start request.
    """
    await self.connect()
    assert self.service is not None
    await self.service.send_plist({
        "Request": "StartActivity",
        "MessageFilter": message_filter,
        "Pid": pid,
        "StreamFlags": stream_flags,
    })

    (length_length,) = struct.unpack("<I", await self.service.recvall(4))
    length = int((await self.service.recvall(length_length))[::-1].hex(), 16)
    response = plistlib.loads(await self.service.recvall(length))

    if response.get("Status") != "RequestSuccessful":
        raise PyMobileDevice3Exception(f"got invalid response: {response}")

    while True:
        assert await self.service.recvall(1) == b"\x02"
        (length,) = struct.unpack("<I", await self.service.recvall(4))
        line = await self.service.recvall(length)
        yield parse_syslog_entry(line)

pymobiledevice3.services.installation_proxy.InstallationProxyService

Bases: LockdownService

Client for the com.apple.mobile.installation_proxy lockdown service.

Provides access to the device's application installation database: installing, upgrading, uninstalling, archiving/restoring, browsing and looking up apps, as well as uploading carrier bundles (IPCC). Most operations send a plist command over the service connection and stream back progress and status responses.

Source code in pymobiledevice3/services/installation_proxy.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class InstallationProxyService(LockdownService):
    """
    Client for the ``com.apple.mobile.installation_proxy`` lockdown service.

    Provides access to the device's application installation database: installing,
    upgrading, uninstalling, archiving/restoring, browsing and looking up apps, as well
    as uploading carrier bundles (IPCC). Most operations send a plist command over the
    service connection and stream back progress and status responses.
    """

    SERVICE_NAME = "com.apple.mobile.installation_proxy"
    RSD_SERVICE_NAME = "com.apple.mobile.installation_proxy.shim.remote"

    def __init__(self, lockdown: LockdownServiceProvider):
        if isinstance(lockdown, LockdownClient):
            super().__init__(lockdown, self.SERVICE_NAME)
        else:
            super().__init__(lockdown, self.RSD_SERVICE_NAME)

    async def _watch_completion(self, handler: Optional[Callable] = None, *args) -> None:
        while True:
            response = await self.service.recv_plist()
            if not response:
                break
            error = response.get("Error")
            if error:
                raise AppInstallError(f"{error}: {response.get('ErrorDescription')}")
            completion = response.get("PercentComplete")
            if completion:
                if handler:
                    self.logger.debug("calling handler")
                    handler(completion, *args)
                self.logger.info(f"{completion}% Complete")
            if response.get("Status") == "Complete":
                self.logger.info("Installation succeed.")
                return
        raise AppInstallError()

    async def send_cmd_for_bundle_identifier(
        self,
        bundle_identifier: str,
        cmd: str = "Archive",
        options: Optional[dict] = None,
        handler: Optional[Callable] = None,
        *args,
    ) -> None:
        """
        Send a command that targets an installed app by its bundle identifier and wait for completion.

        Sends a plist of the form ``{"Command": cmd, "ApplicationIdentifier": bundle_identifier,
        "ClientOptions": options}`` to the service, then consumes progress/status responses until
        the operation completes. Used to back `restore` and `uninstall`.

        :param bundle_identifier: Bundle identifier of the target app, sent as ``ApplicationIdentifier``.
        :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Archive"``.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the service reports an error or finishes without a ``Complete`` status.
        """
        cmd: dict = {"Command": cmd, "ApplicationIdentifier": bundle_identifier}

        if options is None:
            options = {}

        cmd.update({"ClientOptions": options})
        await self.service.send_plist(cmd)
        await self._watch_completion(handler, *args)

    async def upgrade(
        self, ipa_path: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
    ) -> None:
        """
        Upgrade an installed app from a local package.

        Delegates to `install_from_local` with the ``"Upgrade"`` command.

        :param ipa_path: Local path to the package (``.ipa``/``.ipcc`` or an app directory).
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the upgrade fails.
        """
        await self.install_from_local(ipa_path, "Upgrade", options, handler, args)

    async def restore(
        self, bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
    ) -> None:
        """
        Restore a previously archived app, identified by its bundle identifier.

        Sends the ``"Restore"`` command via `send_cmd_for_bundle_identifier`.

        :param bundle_identifier: Bundle identifier of the app to restore.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the restore fails.
        """
        await self.send_cmd_for_bundle_identifier(bundle_identifier, "Restore", options, handler, args)

    async def uninstall(
        self, bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
    ) -> None:
        """
        Uninstall an app, identified by its bundle identifier.

        Sends the ``"Uninstall"`` command via `send_cmd_for_bundle_identifier`.

        :param bundle_identifier: Bundle identifier of the app to uninstall.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the uninstall fails.
        """
        await self.send_cmd_for_bundle_identifier(bundle_identifier, "Uninstall", options, handler, args)

    async def install_from_bytes(
        self,
        package_bytes: bytes,
        cmd: str = "Install",
        options: Optional[dict] = None,
        handler: Optional[Callable] = None,
        *args,
    ) -> None:
        """
        Install an app or carrier bundle from a zipped package held in memory.

        The package type is detected from the zip contents (see `classify_zip_file`): a
        ``.app`` payload is treated as an ``.ipa``, a ``.bundle`` payload as an ``.ipcc``. For
        IPCC packages, ``PackageType`` is forced to ``"CarrierBundle"`` in the options. The bytes
        are uploaded to a temporary path under ``/PublicStaging/pymobiledevice3`` via AFC, the
        command is dispatched with `send_package`, and the temporary file is removed
        afterwards.

        :param package_bytes: Raw bytes of the ``.ipa``/``.ipcc`` zip package.
        :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Install"``.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the bytes are not a valid package, lack a ``Payload`` directory,
            or if the installation fails.
        """
        ipcc_mode = classify_zip_file(package_bytes).is_ipcc()

        if options is None:
            options = {}

        if ipcc_mode:
            options["PackageType"] = "CarrierBundle"

        async with AfcService(self.lockdown) as afc:
            fpath = f"{TEMP_REMOTE_BASEDIR}/{uuid.uuid4()}.{'ipcc' if ipcc_mode else 'ipa'}"
            try:
                if not ipcc_mode:
                    await afc.makedirs(TEMP_REMOTE_BASEDIR)
                    await afc.set_file_contents(fpath, package_bytes)
                else:
                    await self.upload_ipcc_from_bytes(package_bytes, fpath, afc)

                await self.send_package(cmd, options, handler, fpath, *args)
            finally:
                await afc.rm_single(fpath, force=True)

    @str_to_path("package_path")
    async def install_from_local(
        self,
        package_path: Path,
        cmd: str = "Install",
        options: Optional[dict] = None,
        handler: Optional[Callable] = None,
        developer: bool = False,
        *args,
    ) -> None:
        """
        Install an app or carrier bundle from a local path.

        Dispatches on ``package_path``: a ``.ipcc`` suffix is treated as a carrier bundle
        (``PackageType`` forced to ``"CarrierBundle"``); a directory is treated as an unpackaged
        app and zipped into an ``.ipa`` via `create_ipa_contents_from_directory`; any other
        path is read as ``.ipa`` bytes. When ``developer`` is set, ``PackageType`` is forced to
        ``"Developer"``. The payload is uploaded to a temporary path under
        ``/PublicStaging/pymobiledevice3`` via AFC, the command is dispatched with
        `send_package`, and the temporary file is removed afterwards.

        :param package_path: Local path to a ``.ipa`` file, a ``.ipcc`` file, or an app directory.
            String values are coerced to `Path`.
        :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Install"``.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param developer: When ``True``, sets ``PackageType`` to ``"Developer"`` in the options.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the installation fails.
        """
        ipcc_mode = package_path.suffix == ".ipcc"

        if options is None:
            options = {}

        if ipcc_mode:
            options["PackageType"] = "CarrierBundle"
        else:
            if package_path.is_dir():
                # treat as app, convert into an ipa
                ipa_contents = create_ipa_contents_from_directory(str(package_path))
            else:
                # treat as ipa
                ipa_contents = package_path.read_bytes()

        if developer:
            options["PackageType"] = "Developer"

        async with AfcService(self.lockdown) as afc:
            fname = f"{TEMP_REMOTE_BASEDIR}/{uuid.uuid4()}.{'ipcc' if ipcc_mode else 'ipa'}"
            try:
                if not ipcc_mode:
                    await afc.makedirs(TEMP_REMOTE_BASEDIR)
                    await afc.set_file_contents(fname, ipa_contents)

                else:
                    await self.upload_ipcc_from_path(package_path, fname, afc)

                await self.send_package(cmd, options, handler, fname, *args)
            finally:
                await afc.rm_single(fname, force=True)

    async def send_package(self, cmd: str, options: Optional[dict], handler: Callable, package_path: str, *args):
        """
        Send an install/upgrade command for a package already staged on the device, and wait for completion.

        Sends ``{"Command": cmd, "ClientOptions": options, "PackagePath": package_path}`` to the
        service, then consumes progress/status responses until the operation completes. Typically
        invoked by `install_from_local` and `install_from_bytes` after uploading the
        package via AFC.

        :param cmd: Installation-proxy command name, sent as ``Command``.
        :param options: Command options, sent as ``ClientOptions``.
        :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
            progress response. ``None`` disables progress callbacks.
        :param package_path: On-device path to the staged package, sent as ``PackagePath``.
        :param args: Extra positional arguments forwarded to ``handler``.
        :returns: None.
        :raises AppInstallError: If the service reports an error or finishes without a ``Complete`` status.
        """
        await self.service.send_plist({
            "Command": cmd,
            "ClientOptions": options,
            "PackagePath": package_path,
        })

        await self._watch_completion(handler, args)

    async def upload_ipcc_from_path(self, file: Path, remote_path: str, afc_client: AfcService) -> None:
        """
        Upload a local IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

        Reads the file into memory and forwards it to the unpacking uploader, which recreates the
        zip's directory tree under ``remote_path`` on the device via the given AFC client.

        :param file: Local path to the ``.ipcc`` zip file.
        :param remote_path: Destination directory on the device where the bundle is unpacked.
        :param afc_client: Connected `AfcService` used to write
            the files.
        :returns: None.
        """
        with file.open("rb") as fb:
            file_name = file.name
            file_stream = BytesIO(fb.read())
            self.logger.info(f"Uploading {file_name} contents..")
            await self._upload_ipcc(file_stream, afc_client, remote_path)

    async def upload_ipcc_from_bytes(self, file_bytes: bytes, remote_path: str, afc_client: AfcService) -> None:
        """
        Upload an in-memory IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

        Wraps the bytes in a stream and forwards them to the unpacking uploader, which recreates the
        zip's directory tree under ``remote_path`` on the device via the given AFC client.

        :param file_bytes: Raw bytes of the ``.ipcc`` zip.
        :param remote_path: Destination directory on the device where the bundle is unpacked.
        :param afc_client: Connected `AfcService` used to write
            the files.
        :returns: None.
        """
        file_stream = BytesIO(file_bytes)
        self.logger.info("Uploading IPCC from given bytes..")
        await self._upload_ipcc(file_stream, afc_client, remote_path)

    async def _upload_ipcc(self, file_stream: BytesIO, afc_client: AfcService, dst: str) -> None:
        self.logger.info(f"Uploading {dst} contents..")
        await afc_client.makedirs(dst)

        # we unpack it and upload it directly instead of saving it in a temp folder
        with ZipFile(file_stream, "r") as file_zip:
            for file_name in file_zip.namelist():
                if file_name.endswith(("/", "\\")):
                    await afc_client.makedirs(f"{dst}/{file_name}")
                    continue

                with file_zip.open(file_name) as inside_file_zip:
                    file_data = inside_file_zip.read()
                    await afc_client.makedirs(dst)
                    await afc_client.set_file_contents(f"{dst}/{file_name}", file_data)

        self.logger.info("Upload complete.")

    async def check_capabilities_match(
        self, capabilities: Optional[dict] = None, options: Optional[dict] = None
    ) -> dict:
        """
        Ask the device whether it satisfies a set of app capabilities.

        Sends the ``"CheckCapabilitiesMatch"`` command. When ``capabilities`` is provided it is
        sent under the ``Capabilities`` key; otherwise no capabilities key is sent. Returns the
        single ``LookupResult`` value from the response.

        :param capabilities: Capabilities to check, sent as ``Capabilities``. If falsy, the key is
            omitted.
        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :returns: The ``LookupResult`` value from the response describing the matched capabilities,
            or ``None`` if absent.
        """
        if options is None:
            options = {}
        cmd = {"Command": "CheckCapabilitiesMatch", "ClientOptions": options}

        if capabilities:
            cmd["Capabilities"] = capabilities

        await self.service.send_plist(cmd)
        return (await self.service.recv_plist()).get("LookupResult")

    async def browse(self, options: Optional[dict] = None, attributes: Optional[list[str]] = None) -> list[dict]:
        """
        Enumerate installed apps via the ``"Browse"`` command.

        Sends the command and accumulates each response's ``CurrentList`` entries until the service
        reports a ``Complete`` status (or returns an empty response).

        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :param attributes: When provided, set as the ``ReturnAttributes`` option to limit which
            per-app attributes are returned.
        :returns: A list of per-app info dictionaries collected from all ``CurrentList`` responses.
        """
        if options is None:
            options = {}
        if attributes:
            options["ReturnAttributes"] = attributes

        cmd = {"Command": "Browse", "ClientOptions": options}

        await self.service.send_plist(cmd)

        result = []
        while True:
            response = await self.service.recv_plist()
            if not response:
                break

            data = response.get("CurrentList")
            if data is not None:
                result += data

            if response.get("Status") == "Complete":
                break

        return result

    async def lookup(self, options: Optional[dict] = None) -> dict:
        """
        Look up installed apps via the ``"Lookup"`` command.

        Sends the command and returns the single ``LookupResult`` value from the response. The
        options dict (e.g. ``BundleIDs``, ``ApplicationType``, ``ReturnAttributes``) determines
        which apps and attributes are returned.

        :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
        :returns: The ``LookupResult`` mapping bundle identifiers to per-app info dictionaries, or
            ``None`` if absent.
        """
        if options is None:
            options = {}
        cmd = {"Command": "Lookup", "ClientOptions": options}
        await self.service.send_plist(cmd)
        return (await self.service.recv_plist()).get("LookupResult")

    async def get_apps(
        self,
        application_type: str = "Any",
        calculate_sizes: bool = False,
        bundle_identifiers: Optional[list[str]] = None,
        show_placeholders: bool = False,
    ) -> dict[str, dict]:
        """
        Retrieve installed apps, keyed by bundle identifier.

        Builds a lookup query from the given filters and calls `lookup`. When
        ``calculate_sizes`` is set, a second lookup is issued requesting ``CFBundleIdentifier``,
        ``StaticDiskUsage`` and ``DynamicDiskUsage``, and those size attributes are merged into the
        per-app entries.

        :param application_type: Value for the ``ApplicationType`` option. Defaults to ``"Any"``;
            other values include ``"System"`` and ``"User"``.
        :param calculate_sizes: When ``True``, also fetch and merge static/dynamic disk usage for
            each app.
        :param bundle_identifiers: When provided, restrict the query to these bundle identifiers via
            the ``BundleIDs`` option.
        :param show_placeholders: When ``True``, set the ``ShowPlaceholders`` option to include
            placeholder (e.g. installing/downloading) apps.
            See <https://github.com/doronz88/pymobiledevice3/issues/1602> for details.
        :returns: A dictionary mapping each bundle identifier to its per-app info dictionary.
        """
        options = {}
        if bundle_identifiers is not None:
            options["BundleIDs"] = bundle_identifiers

        options["ApplicationType"] = application_type
        if show_placeholders:
            options["ShowPlaceholders"] = True
        result = await self.lookup(options)
        if calculate_sizes:
            options.update(GET_APPS_ADDITIONAL_INFO)
            additional_info = await self.lookup(options)
            for bundle_identifier, app in additional_info.items():
                result[bundle_identifier].update(app)
        return result

send_cmd_for_bundle_identifier async

send_cmd_for_bundle_identifier(bundle_identifier: str, cmd: str = 'Archive', options: Optional[dict] = None, handler: Optional[Callable] = None, *args) -> None

Send a command that targets an installed app by its bundle identifier and wait for completion.

Sends a plist of the form {"Command": cmd, "ApplicationIdentifier": bundle_identifier, "ClientOptions": options} to the service, then consumes progress/status responses until the operation completes. Used to back restore and uninstall.

Parameters:

Name Type Description Default
bundle_identifier str

Bundle identifier of the target app, sent as ApplicationIdentifier.

required
cmd str

Installation-proxy command name, sent as Command. Defaults to "Archive".

'Archive'
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the service reports an error or finishes without a Complete status.

Source code in pymobiledevice3/services/installation_proxy.py
async def send_cmd_for_bundle_identifier(
    self,
    bundle_identifier: str,
    cmd: str = "Archive",
    options: Optional[dict] = None,
    handler: Optional[Callable] = None,
    *args,
) -> None:
    """
    Send a command that targets an installed app by its bundle identifier and wait for completion.

    Sends a plist of the form ``{"Command": cmd, "ApplicationIdentifier": bundle_identifier,
    "ClientOptions": options}`` to the service, then consumes progress/status responses until
    the operation completes. Used to back `restore` and `uninstall`.

    :param bundle_identifier: Bundle identifier of the target app, sent as ``ApplicationIdentifier``.
    :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Archive"``.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the service reports an error or finishes without a ``Complete`` status.
    """
    cmd: dict = {"Command": cmd, "ApplicationIdentifier": bundle_identifier}

    if options is None:
        options = {}

    cmd.update({"ClientOptions": options})
    await self.service.send_plist(cmd)
    await self._watch_completion(handler, *args)

upgrade async

upgrade(ipa_path: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args) -> None

Upgrade an installed app from a local package.

Delegates to install_from_local with the "Upgrade" command.

Parameters:

Name Type Description Default
ipa_path str

Local path to the package (.ipa/.ipcc or an app directory).

required
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the upgrade fails.

Source code in pymobiledevice3/services/installation_proxy.py
async def upgrade(
    self, ipa_path: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
) -> None:
    """
    Upgrade an installed app from a local package.

    Delegates to `install_from_local` with the ``"Upgrade"`` command.

    :param ipa_path: Local path to the package (``.ipa``/``.ipcc`` or an app directory).
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the upgrade fails.
    """
    await self.install_from_local(ipa_path, "Upgrade", options, handler, args)

restore async

restore(bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args) -> None

Restore a previously archived app, identified by its bundle identifier.

Sends the "Restore" command via send_cmd_for_bundle_identifier.

Parameters:

Name Type Description Default
bundle_identifier str

Bundle identifier of the app to restore.

required
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the restore fails.

Source code in pymobiledevice3/services/installation_proxy.py
async def restore(
    self, bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
) -> None:
    """
    Restore a previously archived app, identified by its bundle identifier.

    Sends the ``"Restore"`` command via `send_cmd_for_bundle_identifier`.

    :param bundle_identifier: Bundle identifier of the app to restore.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the restore fails.
    """
    await self.send_cmd_for_bundle_identifier(bundle_identifier, "Restore", options, handler, args)

uninstall async

uninstall(bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args) -> None

Uninstall an app, identified by its bundle identifier.

Sends the "Uninstall" command via send_cmd_for_bundle_identifier.

Parameters:

Name Type Description Default
bundle_identifier str

Bundle identifier of the app to uninstall.

required
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the uninstall fails.

Source code in pymobiledevice3/services/installation_proxy.py
async def uninstall(
    self, bundle_identifier: str, options: Optional[dict] = None, handler: Optional[Callable] = None, *args
) -> None:
    """
    Uninstall an app, identified by its bundle identifier.

    Sends the ``"Uninstall"`` command via `send_cmd_for_bundle_identifier`.

    :param bundle_identifier: Bundle identifier of the app to uninstall.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the uninstall fails.
    """
    await self.send_cmd_for_bundle_identifier(bundle_identifier, "Uninstall", options, handler, args)

install_from_bytes async

install_from_bytes(package_bytes: bytes, cmd: str = 'Install', options: Optional[dict] = None, handler: Optional[Callable] = None, *args) -> None

Install an app or carrier bundle from a zipped package held in memory.

The package type is detected from the zip contents (see classify_zip_file): a .app payload is treated as an .ipa, a .bundle payload as an .ipcc. For IPCC packages, PackageType is forced to "CarrierBundle" in the options. The bytes are uploaded to a temporary path under /PublicStaging/pymobiledevice3 via AFC, the command is dispatched with send_package, and the temporary file is removed afterwards.

Parameters:

Name Type Description Default
package_bytes bytes

Raw bytes of the .ipa/.ipcc zip package.

required
cmd str

Installation-proxy command name, sent as Command. Defaults to "Install".

'Install'
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the bytes are not a valid package, lack a Payload directory, or if the installation fails.

Source code in pymobiledevice3/services/installation_proxy.py
async def install_from_bytes(
    self,
    package_bytes: bytes,
    cmd: str = "Install",
    options: Optional[dict] = None,
    handler: Optional[Callable] = None,
    *args,
) -> None:
    """
    Install an app or carrier bundle from a zipped package held in memory.

    The package type is detected from the zip contents (see `classify_zip_file`): a
    ``.app`` payload is treated as an ``.ipa``, a ``.bundle`` payload as an ``.ipcc``. For
    IPCC packages, ``PackageType`` is forced to ``"CarrierBundle"`` in the options. The bytes
    are uploaded to a temporary path under ``/PublicStaging/pymobiledevice3`` via AFC, the
    command is dispatched with `send_package`, and the temporary file is removed
    afterwards.

    :param package_bytes: Raw bytes of the ``.ipa``/``.ipcc`` zip package.
    :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Install"``.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the bytes are not a valid package, lack a ``Payload`` directory,
        or if the installation fails.
    """
    ipcc_mode = classify_zip_file(package_bytes).is_ipcc()

    if options is None:
        options = {}

    if ipcc_mode:
        options["PackageType"] = "CarrierBundle"

    async with AfcService(self.lockdown) as afc:
        fpath = f"{TEMP_REMOTE_BASEDIR}/{uuid.uuid4()}.{'ipcc' if ipcc_mode else 'ipa'}"
        try:
            if not ipcc_mode:
                await afc.makedirs(TEMP_REMOTE_BASEDIR)
                await afc.set_file_contents(fpath, package_bytes)
            else:
                await self.upload_ipcc_from_bytes(package_bytes, fpath, afc)

            await self.send_package(cmd, options, handler, fpath, *args)
        finally:
            await afc.rm_single(fpath, force=True)

install_from_local async

install_from_local(package_path: Path, cmd: str = 'Install', options: Optional[dict] = None, handler: Optional[Callable] = None, developer: bool = False, *args) -> None

Install an app or carrier bundle from a local path.

Dispatches on package_path: a .ipcc suffix is treated as a carrier bundle (PackageType forced to "CarrierBundle"); a directory is treated as an unpackaged app and zipped into an .ipa via create_ipa_contents_from_directory; any other path is read as .ipa bytes. When developer is set, PackageType is forced to "Developer". The payload is uploaded to a temporary path under /PublicStaging/pymobiledevice3 via AFC, the command is dispatched with send_package, and the temporary file is removed afterwards.

Parameters:

Name Type Description Default
package_path Path

Local path to a .ipa file, a .ipcc file, or an app directory. String values are coerced to Path.

required
cmd str

Installation-proxy command name, sent as Command. Defaults to "Install".

'Install'
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
handler Optional[Callable]

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

None
developer bool

When True, sets PackageType to "Developer" in the options.

False
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description
None

None.

Raises:

Type Description
AppInstallError

If the installation fails.

Source code in pymobiledevice3/services/installation_proxy.py
@str_to_path("package_path")
async def install_from_local(
    self,
    package_path: Path,
    cmd: str = "Install",
    options: Optional[dict] = None,
    handler: Optional[Callable] = None,
    developer: bool = False,
    *args,
) -> None:
    """
    Install an app or carrier bundle from a local path.

    Dispatches on ``package_path``: a ``.ipcc`` suffix is treated as a carrier bundle
    (``PackageType`` forced to ``"CarrierBundle"``); a directory is treated as an unpackaged
    app and zipped into an ``.ipa`` via `create_ipa_contents_from_directory`; any other
    path is read as ``.ipa`` bytes. When ``developer`` is set, ``PackageType`` is forced to
    ``"Developer"``. The payload is uploaded to a temporary path under
    ``/PublicStaging/pymobiledevice3`` via AFC, the command is dispatched with
    `send_package`, and the temporary file is removed afterwards.

    :param package_path: Local path to a ``.ipa`` file, a ``.ipcc`` file, or an app directory.
        String values are coerced to `Path`.
    :param cmd: Installation-proxy command name, sent as ``Command``. Defaults to ``"Install"``.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param developer: When ``True``, sets ``PackageType`` to ``"Developer"`` in the options.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the installation fails.
    """
    ipcc_mode = package_path.suffix == ".ipcc"

    if options is None:
        options = {}

    if ipcc_mode:
        options["PackageType"] = "CarrierBundle"
    else:
        if package_path.is_dir():
            # treat as app, convert into an ipa
            ipa_contents = create_ipa_contents_from_directory(str(package_path))
        else:
            # treat as ipa
            ipa_contents = package_path.read_bytes()

    if developer:
        options["PackageType"] = "Developer"

    async with AfcService(self.lockdown) as afc:
        fname = f"{TEMP_REMOTE_BASEDIR}/{uuid.uuid4()}.{'ipcc' if ipcc_mode else 'ipa'}"
        try:
            if not ipcc_mode:
                await afc.makedirs(TEMP_REMOTE_BASEDIR)
                await afc.set_file_contents(fname, ipa_contents)

            else:
                await self.upload_ipcc_from_path(package_path, fname, afc)

            await self.send_package(cmd, options, handler, fname, *args)
        finally:
            await afc.rm_single(fname, force=True)

send_package async

send_package(cmd: str, options: Optional[dict], handler: Callable, package_path: str, *args)

Send an install/upgrade command for a package already staged on the device, and wait for completion.

Sends {"Command": cmd, "ClientOptions": options, "PackagePath": package_path} to the service, then consumes progress/status responses until the operation completes. Typically invoked by install_from_local and install_from_bytes after uploading the package via AFC.

Parameters:

Name Type Description Default
cmd str

Installation-proxy command name, sent as Command.

required
options Optional[dict]

Command options, sent as ClientOptions.

required
handler Callable

Progress callback invoked as handler(percent_complete, *args) on each progress response. None disables progress callbacks.

required
package_path str

On-device path to the staged package, sent as PackagePath.

required
args

Extra positional arguments forwarded to handler.

()

Returns:

Type Description

None.

Raises:

Type Description
AppInstallError

If the service reports an error or finishes without a Complete status.

Source code in pymobiledevice3/services/installation_proxy.py
async def send_package(self, cmd: str, options: Optional[dict], handler: Callable, package_path: str, *args):
    """
    Send an install/upgrade command for a package already staged on the device, and wait for completion.

    Sends ``{"Command": cmd, "ClientOptions": options, "PackagePath": package_path}`` to the
    service, then consumes progress/status responses until the operation completes. Typically
    invoked by `install_from_local` and `install_from_bytes` after uploading the
    package via AFC.

    :param cmd: Installation-proxy command name, sent as ``Command``.
    :param options: Command options, sent as ``ClientOptions``.
    :param handler: Progress callback invoked as ``handler(percent_complete, *args)`` on each
        progress response. ``None`` disables progress callbacks.
    :param package_path: On-device path to the staged package, sent as ``PackagePath``.
    :param args: Extra positional arguments forwarded to ``handler``.
    :returns: None.
    :raises AppInstallError: If the service reports an error or finishes without a ``Complete`` status.
    """
    await self.service.send_plist({
        "Command": cmd,
        "ClientOptions": options,
        "PackagePath": package_path,
    })

    await self._watch_completion(handler, args)

upload_ipcc_from_path async

upload_ipcc_from_path(file: Path, remote_path: str, afc_client: AfcService) -> None

Upload a local IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

Reads the file into memory and forwards it to the unpacking uploader, which recreates the zip's directory tree under remote_path on the device via the given AFC client.

Parameters:

Name Type Description Default
file Path

Local path to the .ipcc zip file.

required
remote_path str

Destination directory on the device where the bundle is unpacked.

required
afc_client AfcService

Connected AfcService used to write the files.

required

Returns:

Type Description
None

None.

Source code in pymobiledevice3/services/installation_proxy.py
async def upload_ipcc_from_path(self, file: Path, remote_path: str, afc_client: AfcService) -> None:
    """
    Upload a local IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

    Reads the file into memory and forwards it to the unpacking uploader, which recreates the
    zip's directory tree under ``remote_path`` on the device via the given AFC client.

    :param file: Local path to the ``.ipcc`` zip file.
    :param remote_path: Destination directory on the device where the bundle is unpacked.
    :param afc_client: Connected `AfcService` used to write
        the files.
    :returns: None.
    """
    with file.open("rb") as fb:
        file_name = file.name
        file_stream = BytesIO(fb.read())
        self.logger.info(f"Uploading {file_name} contents..")
        await self._upload_ipcc(file_stream, afc_client, remote_path)

upload_ipcc_from_bytes async

upload_ipcc_from_bytes(file_bytes: bytes, remote_path: str, afc_client: AfcService) -> None

Upload an in-memory IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

Wraps the bytes in a stream and forwards them to the unpacking uploader, which recreates the zip's directory tree under remote_path on the device via the given AFC client.

Parameters:

Name Type Description Default
file_bytes bytes

Raw bytes of the .ipcc zip.

required
remote_path str

Destination directory on the device where the bundle is unpacked.

required
afc_client AfcService

Connected AfcService used to write the files.

required

Returns:

Type Description
None

None.

Source code in pymobiledevice3/services/installation_proxy.py
async def upload_ipcc_from_bytes(self, file_bytes: bytes, remote_path: str, afc_client: AfcService) -> None:
    """
    Upload an in-memory IPCC (carrier bundle) zip to the device, unpacking it into the remote path.

    Wraps the bytes in a stream and forwards them to the unpacking uploader, which recreates the
    zip's directory tree under ``remote_path`` on the device via the given AFC client.

    :param file_bytes: Raw bytes of the ``.ipcc`` zip.
    :param remote_path: Destination directory on the device where the bundle is unpacked.
    :param afc_client: Connected `AfcService` used to write
        the files.
    :returns: None.
    """
    file_stream = BytesIO(file_bytes)
    self.logger.info("Uploading IPCC from given bytes..")
    await self._upload_ipcc(file_stream, afc_client, remote_path)

check_capabilities_match async

check_capabilities_match(capabilities: Optional[dict] = None, options: Optional[dict] = None) -> dict

Ask the device whether it satisfies a set of app capabilities.

Sends the "CheckCapabilitiesMatch" command. When capabilities is provided it is sent under the Capabilities key; otherwise no capabilities key is sent. Returns the single LookupResult value from the response.

Parameters:

Name Type Description Default
capabilities Optional[dict]

Capabilities to check, sent as Capabilities. If falsy, the key is omitted.

None
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None

Returns:

Type Description
dict

The LookupResult value from the response describing the matched capabilities, or None if absent.

Source code in pymobiledevice3/services/installation_proxy.py
async def check_capabilities_match(
    self, capabilities: Optional[dict] = None, options: Optional[dict] = None
) -> dict:
    """
    Ask the device whether it satisfies a set of app capabilities.

    Sends the ``"CheckCapabilitiesMatch"`` command. When ``capabilities`` is provided it is
    sent under the ``Capabilities`` key; otherwise no capabilities key is sent. Returns the
    single ``LookupResult`` value from the response.

    :param capabilities: Capabilities to check, sent as ``Capabilities``. If falsy, the key is
        omitted.
    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :returns: The ``LookupResult`` value from the response describing the matched capabilities,
        or ``None`` if absent.
    """
    if options is None:
        options = {}
    cmd = {"Command": "CheckCapabilitiesMatch", "ClientOptions": options}

    if capabilities:
        cmd["Capabilities"] = capabilities

    await self.service.send_plist(cmd)
    return (await self.service.recv_plist()).get("LookupResult")

browse async

browse(options: Optional[dict] = None, attributes: Optional[list[str]] = None) -> list[dict]

Enumerate installed apps via the "Browse" command.

Sends the command and accumulates each response's CurrentList entries until the service reports a Complete status (or returns an empty response).

Parameters:

Name Type Description Default
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None
attributes Optional[list[str]]

When provided, set as the ReturnAttributes option to limit which per-app attributes are returned.

None

Returns:

Type Description
list[dict]

A list of per-app info dictionaries collected from all CurrentList responses.

Source code in pymobiledevice3/services/installation_proxy.py
async def browse(self, options: Optional[dict] = None, attributes: Optional[list[str]] = None) -> list[dict]:
    """
    Enumerate installed apps via the ``"Browse"`` command.

    Sends the command and accumulates each response's ``CurrentList`` entries until the service
    reports a ``Complete`` status (or returns an empty response).

    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :param attributes: When provided, set as the ``ReturnAttributes`` option to limit which
        per-app attributes are returned.
    :returns: A list of per-app info dictionaries collected from all ``CurrentList`` responses.
    """
    if options is None:
        options = {}
    if attributes:
        options["ReturnAttributes"] = attributes

    cmd = {"Command": "Browse", "ClientOptions": options}

    await self.service.send_plist(cmd)

    result = []
    while True:
        response = await self.service.recv_plist()
        if not response:
            break

        data = response.get("CurrentList")
        if data is not None:
            result += data

        if response.get("Status") == "Complete":
            break

    return result

lookup async

lookup(options: Optional[dict] = None) -> dict

Look up installed apps via the "Lookup" command.

Sends the command and returns the single LookupResult value from the response. The options dict (e.g. BundleIDs, ApplicationType, ReturnAttributes) determines which apps and attributes are returned.

Parameters:

Name Type Description Default
options Optional[dict]

Command options, sent as ClientOptions. None is sent as an empty dict.

None

Returns:

Type Description
dict

The LookupResult mapping bundle identifiers to per-app info dictionaries, or None if absent.

Source code in pymobiledevice3/services/installation_proxy.py
async def lookup(self, options: Optional[dict] = None) -> dict:
    """
    Look up installed apps via the ``"Lookup"`` command.

    Sends the command and returns the single ``LookupResult`` value from the response. The
    options dict (e.g. ``BundleIDs``, ``ApplicationType``, ``ReturnAttributes``) determines
    which apps and attributes are returned.

    :param options: Command options, sent as ``ClientOptions``. ``None`` is sent as an empty dict.
    :returns: The ``LookupResult`` mapping bundle identifiers to per-app info dictionaries, or
        ``None`` if absent.
    """
    if options is None:
        options = {}
    cmd = {"Command": "Lookup", "ClientOptions": options}
    await self.service.send_plist(cmd)
    return (await self.service.recv_plist()).get("LookupResult")

get_apps async

get_apps(application_type: str = 'Any', calculate_sizes: bool = False, bundle_identifiers: Optional[list[str]] = None, show_placeholders: bool = False) -> dict[str, dict]

Retrieve installed apps, keyed by bundle identifier.

Builds a lookup query from the given filters and calls lookup. When calculate_sizes is set, a second lookup is issued requesting CFBundleIdentifier, StaticDiskUsage and DynamicDiskUsage, and those size attributes are merged into the per-app entries.

Parameters:

Name Type Description Default
application_type str

Value for the ApplicationType option. Defaults to "Any"; other values include "System" and "User".

'Any'
calculate_sizes bool

When True, also fetch and merge static/dynamic disk usage for each app.

False
bundle_identifiers Optional[list[str]]

When provided, restrict the query to these bundle identifiers via the BundleIDs option.

None
show_placeholders bool

When True, set the ShowPlaceholders option to include placeholder (e.g. installing/downloading) apps. See https://github.com/doronz88/pymobiledevice3/issues/1602 for details.

False

Returns:

Type Description
dict[str, dict]

A dictionary mapping each bundle identifier to its per-app info dictionary.

Source code in pymobiledevice3/services/installation_proxy.py
async def get_apps(
    self,
    application_type: str = "Any",
    calculate_sizes: bool = False,
    bundle_identifiers: Optional[list[str]] = None,
    show_placeholders: bool = False,
) -> dict[str, dict]:
    """
    Retrieve installed apps, keyed by bundle identifier.

    Builds a lookup query from the given filters and calls `lookup`. When
    ``calculate_sizes`` is set, a second lookup is issued requesting ``CFBundleIdentifier``,
    ``StaticDiskUsage`` and ``DynamicDiskUsage``, and those size attributes are merged into the
    per-app entries.

    :param application_type: Value for the ``ApplicationType`` option. Defaults to ``"Any"``;
        other values include ``"System"`` and ``"User"``.
    :param calculate_sizes: When ``True``, also fetch and merge static/dynamic disk usage for
        each app.
    :param bundle_identifiers: When provided, restrict the query to these bundle identifiers via
        the ``BundleIDs`` option.
    :param show_placeholders: When ``True``, set the ``ShowPlaceholders`` option to include
        placeholder (e.g. installing/downloading) apps.
        See <https://github.com/doronz88/pymobiledevice3/issues/1602> for details.
    :returns: A dictionary mapping each bundle identifier to its per-app info dictionary.
    """
    options = {}
    if bundle_identifiers is not None:
        options["BundleIDs"] = bundle_identifiers

    options["ApplicationType"] = application_type
    if show_placeholders:
        options["ShowPlaceholders"] = True
    result = await self.lookup(options)
    if calculate_sizes:
        options.update(GET_APPS_ADDITIONAL_INFO)
        additional_info = await self.lookup(options)
        for bundle_identifier, app in additional_info.items():
            result[bundle_identifier].update(app)
    return result

pymobiledevice3.services.afc.AfcService

Bases: LockdownService

Apple File Connection (AFC) Service for iOS device file system access.

This service provides full file system access to the /var/mobile/Media directory on iOS devices. It supports standard file operations including read, write, delete, rename, and directory operations.

The service communicates using a custom binary protocol with operation codes for different file system operations. AFC responses echo the packet_num from the request, enabling full concurrent operation: multiple callers may issue operations simultaneously and a background reader task routes each response to the correct waiter via a per-packet_num Future.

Attributes: SERVICE_NAME: Service identifier for lockdown-based connections RSD_SERVICE_NAME: Service identifier for RSD-based connections packet_num: Counter for tracking packet sequence numbers

Source code in pymobiledevice3/services/afc.py
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
class AfcService(LockdownService):
    """
    Apple File Connection (AFC) Service for iOS device file system access.

    This service provides full file system access to the /var/mobile/Media directory
    on iOS devices. It supports standard file operations including read, write, delete,
    rename, and directory operations.

    The service communicates using a custom binary protocol with operation codes for
    different file system operations. AFC responses echo the ``packet_num`` from the
    request, enabling full concurrent operation: multiple callers may issue operations
    simultaneously and a background reader task routes each response to the correct waiter
    via a per-``packet_num`` Future.

    Attributes:
        SERVICE_NAME: Service identifier for lockdown-based connections
        RSD_SERVICE_NAME: Service identifier for RSD-based connections
        packet_num: Counter for tracking packet sequence numbers
    """

    SERVICE_NAME = "com.apple.afc"
    RSD_SERVICE_NAME = "com.apple.afc.shim.remote"

    def __init__(self, lockdown: LockdownServiceProvider, service_name: Optional[str] = None):
        """
        Initialize the AFC service.

        :param lockdown: Lockdown service provider for establishing connection
        :param service_name: Optional service name override. Auto-detected if None
        """
        if service_name is None:
            service_name = self.SERVICE_NAME if isinstance(lockdown, LockdownClient) else self.RSD_SERVICE_NAME
        super().__init__(lockdown, service_name)
        self.packet_num = 0
        # Demux state — populated in __aenter__, torn down in __aexit__
        self._afc_pending: dict[int, asyncio.Future] = {}
        self._afc_write_lock = asyncio.Lock()
        self._afc_reader_task: Optional[asyncio.Task] = None

    async def __aenter__(self):
        await super().__aenter__()
        self._afc_reader_task = asyncio.create_task(self._afc_reader_loop(), name="afc-reader")
        return self

    async def aclose(self) -> None:
        if self._afc_reader_task is not None:
            self._afc_reader_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._afc_reader_task
            self._afc_reader_task = None
        for fut in self._afc_pending.values():
            if not fut.done():
                fut.set_exception(ConnectionTerminatedError())
        self._afc_pending.clear()
        await super().close()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()

    async def _afc_reader_loop(self) -> None:
        """Background task: read AFC response packets and route them to waiting callers."""
        try:
            while True:
                header_bytes = await self.service.recvall(afc_header_t.sizeof())
                if not header_bytes:
                    break
                header = afc_header_t.parse(header_bytes)
                assert header.entire_length >= afc_header_t.sizeof()
                length = header.entire_length - afc_header_t.sizeof()
                data = await self.service.recvall(length) if length else b""

                status = AfcError.SUCCESS
                if header.operation == AfcOpcode.STATUS:
                    if length != 8:
                        self.logger.error("Status length != 8")
                    status = afc_error_construct.parse(data)
                elif header.operation != AfcOpcode.DATA:
                    self.logger.debug("Unexpected AFC opcode %s", header.operation)

                fut = self._afc_pending.pop(header.packet_num, None)
                if fut is not None and not fut.done():
                    fut.set_result((status, data))
                else:
                    self.logger.warning("AFC: no waiter for packet_num=%d", header.packet_num)
        except asyncio.CancelledError:
            raise
        except Exception as e:
            # Propagate to all pending waiters so they don't hang
            for fut in self._afc_pending.values():
                if not fut.done():
                    fut.set_exception(e)
            self._afc_pending.clear()

    async def pull(
        self,
        relative_src: str,
        dst: str,
        match: Optional[Pattern] = None,
        callback: Optional[Callable] = None,
        src_dir: str = "",
        ignore_errors: bool = False,
        progress_bar: bool = True,
    ) -> None:
        """
        Pull (download) a file or directory from the device to the local machine.

        The source is resolved through any symbolic link before transfer. A regular file is
        written to ``dst``; if ``dst`` is an existing local directory, the file is placed
        inside it under the source's basename. Files larger than ``MAXIMUM_READ_SIZE`` (4 MB)
        are streamed in chunks. The local file's modification time is set to match the
        device's ``st_mtime``. A directory source is copied recursively into a subdirectory
        of ``dst`` named after the source's basename.

        :param relative_src: Source path on the device, resolved relative to ``src_dir``.
        :param dst: Destination path on the local machine.
        :param match: Optional regex; only entries whose basename matches (via ``search``) are
            copied. Non-matching files are skipped.
        :param callback: Optional callable invoked as ``callback(src, dst)`` after each regular
            file is copied. If it returns an awaitable, the awaitable is awaited.
        :param src_dir: Base directory on the device against which ``relative_src`` is joined.
        :param ignore_errors: If True, per-entry errors during directory recursion are logged
            and skipped instead of propagated.
        :param progress_bar: If True, display a tqdm progress bar while streaming large files.
        """
        src = await self.resolve_path(posixpath.join(src_dir, relative_src))

        if not await self.isdir(src):
            # normal file
            if match is not None and not match.search(posixpath.basename(src)):
                return
            if os.path.isdir(dst):
                dst = os.path.join(dst, os.path.basename(relative_src))
            with open(dst, "wb") as f:
                src_size = (await self.stat(src))["st_size"]
                if src_size <= MAXIMUM_READ_SIZE:
                    f.write(await self.get_file_contents(src))
                else:
                    handle = await self.fopen(src)
                    iterator = range(0, src_size, MAXIMUM_READ_SIZE)
                    if progress_bar:
                        iterator = trange(0, src_size, MAXIMUM_READ_SIZE)
                    for offset in iterator:
                        to_read = min(MAXIMUM_READ_SIZE, src_size - offset)
                        f.write(await self.fread(handle, to_read))
                    await self.fclose(handle)
            os.utime(dst, (os.stat(dst).st_atime, (await self.stat(src))["st_mtime"].timestamp()))
            if callback is not None:
                result = callback(src, dst)
                if inspect.isawaitable(result):
                    await result
        else:
            # directory
            dst_path = pathlib.Path(dst) / os.path.basename(relative_src)
            dst_path.mkdir(parents=True, exist_ok=True)

            for filename in await self.listdir(src):
                src_filename = posixpath.join(src, filename)
                dst_filename = dst_path / filename

                src_filename = await self.resolve_path(src_filename)

                if match is not None and not match.search(posixpath.basename(src_filename)):
                    continue

                try:
                    if await self.isdir(src_filename):
                        dst_filename.mkdir(exist_ok=True)
                        await self.pull(
                            src_filename,
                            str(dst_path),
                            match=match,
                            callback=callback,
                            ignore_errors=ignore_errors,
                            progress_bar=progress_bar,
                        )
                        continue

                    await self.pull(
                        src_filename,
                        str(dst_path),
                        match=match,
                        callback=callback,
                        ignore_errors=ignore_errors,
                        progress_bar=progress_bar,
                    )

                except Exception as afc_exception:
                    if not ignore_errors:
                        raise
                    self.logger.warning(f"(Ignoring) Error: {afc_exception} occurred during the copy of {src_filename}")

    @path_to_str()
    async def exists(self, filename):
        """
        Check whether a path exists on the device.

        Implemented by attempting `stat`; a missing path is reported as False and any
        other AFC error propagates.

        :param filename: Path to check.
        :return: True if the path exists, False if it was not found.
        """
        try:
            await self.stat(filename)
        except AfcFileNotFoundError:
            return False
        return True

    @path_to_str()
    async def wait_exists(self, filename):
        """
        Block until a path exists on the device.

        Polls `exists` every 0.1 seconds until the path appears. Does not return
        until the path exists.

        :param filename: Path to wait for.
        """
        while not await self.exists(filename):
            await asyncio.sleep(0.1)

    @path_to_str()
    async def _push_internal(self, local_path, remote_path, callback=None):
        """
        Internal method for pushing files to the device.

        :param local_path: Local file or directory path
        :param remote_path: Remote destination path on the device
        :param callback: Optional callback function called for each file copied (src, dst)
        """
        if callback is not None:
            callback(local_path, remote_path)

        if not os.path.isdir(local_path):
            # normal file
            try:
                if await self.isdir(remote_path):
                    # Remote is dir.
                    remote_path = posixpath.join(remote_path, os.path.basename(local_path))
            except AfcFileNotFoundError:
                # Remote is file.
                remote_parent = posixpath.dirname(remote_path)
                if not await self.exists(remote_parent):
                    raise
                remote_path = posixpath.join(remote_parent, os.path.basename(remote_path))
            with open(local_path, "rb") as f:
                await self.set_file_contents(remote_path, f.read())
        else:
            # directory
            if not await self.exists(remote_path):
                await self.makedirs(remote_path)

            for filename in os.listdir(local_path):
                local_filename = os.path.join(local_path, filename)
                remote_filename = posixpath.join(remote_path, filename).removeprefix("/")

                if os.path.isdir(local_filename):
                    if not await self.exists(remote_filename):
                        await self.makedirs(remote_filename)
                    await self._push_internal(local_filename, remote_filename, callback=callback)
                    continue

                await self._push_internal(local_filename, remote_filename, callback=callback)

    @path_to_str()
    async def push(self, local_path, remote_path, callback=None):
        """
        Push (upload) a file or directory from the local machine to the device.

        A regular file is written to ``remote_path`` (or, if ``remote_path`` is an existing
        remote directory, into it under the file's basename). A local directory is copied
        recursively into ``remote_path``/<basename of local_path>, creating remote directories
        as needed.

        :param local_path: Source path on the local machine.
        :param remote_path: Destination path on the device.
        :param callback: Optional callable invoked as ``callback(local_path, remote_path)``
            before each entry is transferred.
        """
        if os.path.isdir(local_path):
            remote_path = posixpath.join(remote_path, os.path.basename(local_path))
        await self._push_internal(local_path, remote_path, callback)

    @path_to_str()
    async def rm_single(self, filename: str, force: bool = False) -> bool:
        """
        Remove a single file or empty directory on the device.

        Issues a single ``REMOVE_PATH`` operation; it does not recurse into a non-empty
        directory.

        :param filename: Path to the file or directory to remove.
        :param force: If True, swallow an `AfcException` and return False instead of raising.
        :return: True on success; False if the removal failed and ``force`` is True.
        :raises AfcException: if the removal fails and ``force`` is False.
        """
        try:
            await self._do_operation(
                AfcOpcode.REMOVE_PATH, afc_rm_req_t.build(AfcRmRequest(filename=filename)), filename
            )
        except AfcException:
            if force:
                return False
            raise
        return True

    @path_to_str()
    async def rm(self, filename: str, match: Optional[Pattern] = None, force: bool = False) -> list[str]:
        """
        Recursively remove a file or directory tree on the device.

        For a directory, every entry is removed depth-first and the directory itself is removed
        last. The ``match`` pattern, when given, filters which top-level entries of ``filename``
        are considered (matched against their basename via ``match``); nested entries are always
        removed once their parent is selected.

        :param filename: Path to the file or directory to remove.
        :param match: Optional regex; only directory entries whose basename matches are removed.
            None removes all entries.
        :param force: If True, suppress the final `AfcException` and instead return the
            list of paths that could not be deleted.
        :return: List of paths that could not be deleted (empty on full success).
        :raises AfcException: if any path could not be deleted and ``force`` is False.
        """
        if not await self.exists(filename) and not await self.rm_single(filename, force=force):
            return [filename]

        # single file
        if not await self.isdir(filename):
            if await self.rm_single(filename, force=force):
                return []
            return [filename]

        # directory content
        undeleted_items = []
        for entry in await self.listdir(filename):
            current_filename = posixpath.join(filename, entry)

            if match is not None and not match.match(posixpath.basename(current_filename)):
                continue

            if await self.isdir(current_filename):
                ret_undeleted_items = await self.rm(current_filename, force=True)
                undeleted_items.extend(ret_undeleted_items)
            else:
                if not await self.rm_single(current_filename, force=True):
                    undeleted_items.append(current_filename)

        # directory path
        try:
            if not await self.rm_single(filename, force=force):
                undeleted_items.append(filename)
                return undeleted_items
        except AfcException:
            if undeleted_items:
                undeleted_items.append(filename)
            else:
                raise

        if undeleted_items:
            raise AfcException(f"Failed to delete paths: {undeleted_items}", None)

        return []

    async def get_device_info(self):
        """
        Query device file-system information.

        Issues a ``GET_DEVINFO`` operation and decodes the null-terminated key/value list into
        a dictionary (e.g. model, total/free bytes, block size).

        :return: Dictionary of device file-system attributes (all values are strings).
        """
        return list_to_dict(await self._do_operation(AfcOpcode.GET_DEVINFO))

    @path_to_str()
    async def listdir(self, filename: str):
        """
        List the entries of a directory on the device.

        :param filename: Path to the directory.
        :return: Entry names in the directory, excluding ``.`` and ``..``.
        :raises AfcFileNotFoundError: if the path does not exist.
        :raises AfcException: if the path is not a directory or cannot be listed.
        """
        data = await self._do_operation(
            AfcOpcode.READ_DIR, afc_read_dir_req_t.build(AfcReadDirRequest(filename=filename)), filename
        )
        return afc_read_dir_resp_t.parse(data).filenames[2:]  # skip the . and ..

    @path_to_str()
    async def makedirs(self, filename: str):
        """
        Create a directory on the device.

        Issues a ``MAKE_DIR`` operation, creating any missing parent directories.

        :param filename: Path of the directory to create.
        :return: Raw response payload from the operation (typically empty).
        :raises AfcException: if the directory could not be created.
        """
        return await self._do_operation(
            AfcOpcode.MAKE_DIR, afc_mkdir_req_t.build(AfcMkdirRequest(filename=filename)), filename
        )

    @path_to_str()
    async def isdir(self, filename: str) -> bool:
        """
        Check whether a path is a directory.

        Stats the path and tests whether its ``st_ifmt`` is ``S_IFDIR``.

        :param filename: Path to check.
        :return: True if the path is a directory, False otherwise.
        :raises AfcFileNotFoundError: if the path does not exist.
        """
        stat = await self.stat(filename)
        return stat.get("st_ifmt") == "S_IFDIR"

    @path_to_str()
    async def stat(self, filename: str):
        """
        Get file or directory metadata.

        Issues a ``GET_FILE_INFO`` operation and returns the decoded attribute dictionary.
        Numeric fields (``st_size``, ``st_blocks``, ``st_nlink``) are converted to ints;
        ``st_mtime`` and ``st_birthtime`` are converted from device nanosecond timestamps to
        `datetime`. A ``st_ifmt`` entry (e.g. ``S_IFREG``, ``S_IFDIR``,
        ``S_IFLNK``) describes the file type, and symbolic links also carry a ``LinkTarget``.

        :param filename: Path to the file or directory.
        :return: Dictionary of file attributes.
        :raises AfcFileNotFoundError: if the path does not exist (AFC ``READ_ERROR``).
        :raises AfcException: for any other failure status.
        """
        try:
            stat = list_to_dict(
                await self._do_operation(
                    AfcOpcode.GET_FILE_INFO, afc_stat_t.build(AfcStatRequest(filename=filename)), filename
                )
            )
        except AfcException as e:
            if e.status != AfcError.READ_ERROR:
                raise
            raise AfcFileNotFoundError(e.args[0], e.status) from e

        stat["st_size"] = int(stat["st_size"])
        stat["st_blocks"] = int(stat["st_blocks"])
        stat["st_mtime"] = int(stat["st_mtime"])
        stat["st_birthtime"] = int(stat["st_birthtime"])
        stat["st_nlink"] = int(stat["st_nlink"])
        stat["st_mtime"] = datetime.fromtimestamp(stat["st_mtime"] / (10**9))
        stat["st_birthtime"] = datetime.fromtimestamp(stat["st_birthtime"] / (10**9))
        return stat

    @path_to_str()
    async def os_stat(self, path: str):
        """
        Get file statistics as an `stat`-style result.

        Wraps `stat`, mapping the AFC ``st_ifmt`` to a numeric mode and packing the
        attributes into a `StatResult` namedtuple. ``st_ino`` is synthesized from the
        normalized path, ``st_uid``/``st_gid``/``st_dev`` are reported as 0 and ``st_blksize``
        as 4096 (AFC does not expose these). ``st_atime`` mirrors ``st_mtime``.

        :param path: Path to the file or directory.
        :return: A `StatResult` namedtuple.
        :raises AfcFileNotFoundError: if the path does not exist.
        """
        stat = await self.stat(path)
        mode = 0
        for s_mode in ["S_IFDIR", "S_IFCHR", "S_IFBLK", "S_IFREG", "S_IFIFO", "S_IFLNK", "S_IFSOCK"]:
            if stat["st_ifmt"] == s_mode:
                mode = getattr(stat_module, s_mode)
        return StatResult(
            mode,
            hash(posixpath.normpath(path)),
            0,
            stat["st_nlink"],
            0,
            0,
            stat["st_size"],
            stat["st_mtime"].timestamp(),
            stat["st_mtime"].timestamp(),
            stat["st_birthtime"].timestamp(),
            stat["st_blocks"],
            4096,
            stat["st_birthtime"].timestamp(),
        )

    @path_to_str()
    async def link(self, target: str, source: str, type_=AfcLinkType.SYMLINK):
        """
        Create a symbolic or hard link on the device.

        :param target: Path the link points to.
        :param source: Path at which the link is created.
        :param type_: `AfcLinkType` to create (``SYMLINK`` by default, or ``HARDLINK``).
        :return: Raw response payload from the operation (typically empty).
        :raises AfcException: if the link could not be created.
        """
        return await self._do_operation(
            AfcOpcode.MAKE_LINK,
            afc_make_link_req_t.build(AfcMakeLinkRequest(type=type_, target=target, source=source)),
        )

    @path_to_str()
    async def fopen(self, filename: str, mode: str = "r") -> int:
        """
        Open a file on the device and return its handle.

        :param filename: Path to the file.
        :param mode: Textual open mode; one of ``'r'``, ``'r+'``, ``'w'``, ``'w+'``, ``'a'``,
            ``'a+'`` (mapped to the corresponding AFC fopen mode).
        :return: Integer handle usable with `fread`, `fwrite`, `lock` and
            `fclose`.
        :raises ArgumentError: if ``mode`` is not one of the supported textual modes.
        :raises AfcException: if the file could not be opened.
        """
        if mode not in AFC_FOPEN_TEXTUAL_MODES:
            raise ArgumentError(f"mode can be only one of: {AFC_FOPEN_TEXTUAL_MODES.keys()}")

        data = await self._do_operation(
            AfcOpcode.FILE_OPEN,
            afc_fopen_req_t.build(
                AfcFopenRequest(mode=AFC_FOPEN_TEXTUAL_MODES[mode], filename=filename),
            ),
        )
        return afc_fopen_resp_t.parse(data).handle

    async def fclose(self, handle: int):
        """
        Close an open file handle.

        :param handle: Handle returned by `fopen`.
        :return: Raw response payload from the operation (typically empty).
        :raises AfcException: if the close operation fails.
        """
        return await self._do_operation(AfcOpcode.FILE_CLOSE, afc_fclose_req_t.build(AfcFcloseRequest(handle=handle)))

    @path_to_str()
    async def rename(self, source: str, target: str) -> None:
        """
        Rename or move a file or directory on the device.

        :param source: Current path of the file or directory.
        :param target: New path for the file or directory.
        :raises AfcFileNotFoundError: if the operation fails and ``source`` no longer exists.
        :raises AfcException: if the operation fails while ``source`` still exists.
        """
        try:
            await self._do_operation(
                AfcOpcode.RENAME_PATH,
                afc_rename_req_t.build(AfcRenameRequest(source=source, target=target)),
            )
        except AfcException as e:
            if await self.exists(source):
                raise
            raise AfcFileNotFoundError(
                f"Failed to rename {source} into {target}. Got status: {e.status}", e.args[0], str(e.status)
            ) from e

    async def fread(self, handle: int, sz: int) -> bytes:
        """
        Read data from an open file handle.

        Automatically handles large reads by splitting into multiple operations.
        Each chunk send+receive is performed atomically via the demux reader so
        concurrent callers on the same AfcService instance are safe.

        :param handle: Handle returned by `fopen`.
        :param sz: Number of bytes to read. Reads larger than ``MAXIMUM_READ_SIZE`` (4 MB) are
            split into successive operations.
        :return: The bytes read.
        :raises AfcException: if any read operation returns a non-success status.
        """
        data = b""
        while sz > 0:
            to_read = MAXIMUM_READ_SIZE if sz > MAXIMUM_READ_SIZE else sz
            status, chunk = await self._send_and_wait(
                AfcOpcode.READ, afc_fread_req_t.build(AfcFreadRequest(handle=handle, size=to_read))
            )
            if status != AfcError.SUCCESS:
                raise AfcException("fread error", status)
            sz -= to_read
            data += chunk
        return data

    async def fwrite(self, handle: int, data: bytes, chunk_size: int = MAXIMUM_WRITE_SIZE) -> None:
        """
        Write data to an open file handle.

        Automatically handles large writes by splitting into multiple operations.
        Each chunk send+receive is performed atomically via the demux reader so
        concurrent callers on the same AfcService instance are safe.

        :param handle: Handle returned by `fopen`.
        :param data: Bytes to write.
        :param chunk_size: Maximum bytes per ``WRITE`` operation (default ``MAXIMUM_WRITE_SIZE``).
        :raises AfcException: if any write operation returns a non-success status.
        """
        file_handle = struct.pack("<Q", handle)
        chunks_count = len(data) // chunk_size
        for i in range(chunks_count):
            chunk = data[i * chunk_size : (i + 1) * chunk_size]
            status, _ = await self._send_and_wait(AfcOpcode.WRITE, file_handle + chunk, this_length=48)
            if status != AfcError.SUCCESS:
                raise AfcException(f"failed to write chunk: {status}", status)

        if len(data) % chunk_size:
            chunk = data[chunks_count * chunk_size :]
            status, _ = await self._send_and_wait(AfcOpcode.WRITE, file_handle + chunk, this_length=48)
            if status != AfcError.SUCCESS:
                raise AfcException(f"failed to write last chunk: {status}", status)

    @path_to_str()
    async def resolve_path(self, filename: str):
        """
        Resolve a symbolic link to its target path.

        Stats the path; if it is a symbolic link, returns its ``LinkTarget`` (joined against
        the link's parent directory when the target is relative). Non-links are returned
        unchanged. Only a single level of link is resolved.

        :param filename: Path to resolve.
        :return: The link target, or ``filename`` unchanged if it is not a symbolic link.
        :raises AfcFileNotFoundError: if the path does not exist.
        """
        info = await self.stat(filename)
        if info["st_ifmt"] == "S_IFLNK":
            target = info["LinkTarget"]
            filename = posixpath.join(posixpath.dirname(filename), target) if not target.startswith("/") else target
        return filename

    @path_to_str()
    async def get_file_contents(self, filename: str):
        """
        Read and return the entire contents of a file.

        Resolves any symbolic link, verifies the target is a regular file, then opens it,
        reads ``st_size`` bytes and closes the handle.

        :param filename: Path to the file.
        :return: The file contents as bytes.
        :raises AfcException: if the path is not a regular file (``S_IFREG``).
        :raises AfcFileNotFoundError: if the path does not exist.
        """
        filename = await self.resolve_path(filename)
        info = await self.stat(filename)

        if info["st_ifmt"] != "S_IFREG":
            raise AfcException(f"{filename} isn't a file", AfcError.INVALID_ARG, filename)

        h = await self.fopen(filename)
        if not h:
            return
        d = await self.fread(h, int(info["st_size"]))
        await self.fclose(h)
        return d

    @path_to_str()
    async def set_file_contents(self, filename: str, data: bytes) -> None:
        """
        Write data to a file, creating or truncating it.

        Opens the file in ``'w'`` mode (create/truncate), writes ``data`` and closes the handle.

        :param filename: Path to the file.
        :param data: Bytes to write.
        :raises AfcException: if the open or write operation fails.
        """
        h = await self.fopen(filename, "w")
        await self.fwrite(h, data)
        await self.fclose(h)

    @path_to_str()
    async def walk(self, dirname: str):
        """
        Recursively walk a directory tree, similar to `walk`.

        Yields one ``(dirpath, dirnames, filenames)`` tuple per directory, top-down, starting
        at ``dirname``. ``dirnames`` lists subdirectory names and ``filenames`` lists all other
        entries. Symbolic links to directories are classified as files (not descended into).

        :param dirname: Root directory to walk.
        :yields: ``(dirpath, dirnames, filenames)`` tuples.
        """
        dirs = []
        files = []
        for fd in await self.listdir(dirname):
            if fd in (".", "..", ""):
                continue
            infos = await self.stat(posixpath.join(dirname, fd))
            if infos and infos.get("st_ifmt") == "S_IFDIR":
                dirs.append(fd)
            else:
                files.append(fd)

        yield dirname, dirs, files

        if dirs:
            for d in dirs:
                async for item in self.walk(posixpath.join(dirname, d)):
                    yield item

    @path_to_str()
    async def dirlist(self, root: str, depth: int = -1):
        """
        Recursively yield paths under a directory up to a maximum depth.

        Yields ``root`` itself first, followed by the full path of every entry found while
        walking, pruning descent once a directory's depth (in path separators) reaches
        ``depth``.

        :param root: Root directory to list.
        :param depth: Maximum traversal depth; ``-1`` for unlimited, ``0`` to yield only ``root``.
        :yields: Full paths of files and directories.
        """
        async for folder, dirs, files in self.walk(root):
            if folder == root:
                yield folder
                if depth == 0:
                    break
            if folder != root and depth != -1 and folder.count(posixpath.sep) >= depth:
                continue
            for entry in dirs + files:
                yield posixpath.join(folder, entry)

    async def lock(self, handle, operation):
        """
        Apply or release an advisory ``flock``-style lock on an open file.

        :param handle: Handle returned by `fopen`.
        :param operation: Lock operation, one of ``AFC_LOCK_SH`` (shared), ``AFC_LOCK_EX``
            (exclusive) or ``AFC_LOCK_UN`` (unlock).
        :return: Raw response payload from the operation (typically empty).
        :raises AfcException: if the lock operation fails.
        """
        return await self._do_operation(
            AfcOpcode.FILE_LOCK, afc_lock_t.build(AfcLockRequest(handle=handle, op=operation))
        )

    async def _dispatch_packet(self, operation: AfcOpcode, data: bytes, this_length: int = 0) -> int:
        """
        Send an AFC protocol packet to the device.

        :param operation: AFC operation code
        :param data: Packet payload data
        :param this_length: Override for the packet length field (0 for auto-calculation)
        :return: The packet_num assigned to this packet (used to match the response)
        """
        pkt_num = self.packet_num
        entire_length = afc_header_t.sizeof() + len(data)
        header = afc_header_t.build(
            AfcHeader(
                entire_length=entire_length,
                this_length=this_length or entire_length,
                packet_num=pkt_num,
                operation=operation,
            )
        )
        self.packet_num += 1
        await self.service.sendall(header + data)
        return pkt_num

    async def _receive_data(self):
        """
        Receive an AFC protocol response packet from the device.

        .. deprecated::
            Internal use only. External callers should use `_do_operation` or
            `_send_and_wait` which use the background demux reader.

        :return: Tuple of (status_code, response_data)
        """
        header_bytes = await self.service.recvall(afc_header_t.sizeof())
        status = AfcError.SUCCESS
        data = b""
        if header_bytes:
            header = afc_header_t.parse(header_bytes)
            assert header.entire_length >= afc_header_t.sizeof()
            length = header.entire_length - afc_header_t.sizeof()
            data = await self.service.recvall(length)
            if header.operation == AfcOpcode.STATUS:
                if length != 8:
                    self.logger.error("Status length != 8")
                status = afc_error_construct.parse(data)
            elif header.operation != AfcOpcode.DATA:
                self.logger.debug("Unexpected AFC opcode %s", header.operation)
        return status, data

    async def _send_and_wait(self, operation: AfcOpcode, data: bytes, this_length: int = 0):
        """
        Atomically send one AFC packet and wait for its response via the demux reader.

        The write lock ensures that ``packet_num`` assignment, Future registration, and
        the actual ``sendall`` are an uninterruptible unit so the background reader always
        finds a registered Future before the response arrives.

        If the background reader task has not been started yet (e.g. when the service is
        used without ``async with``), it is started lazily on the first call.

        :param operation: AFC operation code
        :param data: Packet payload
        :param this_length: Override for the ``this_length`` header field
        :return: Tuple of (AfcError, response_bytes)
        """
        # Ensure we're connected (lazy connect for non-async-with usage)
        await self.connect()
        # Start background reader lazily for callers that don't use async with.
        # Also restart it if it exited (e.g. connection error): the new task will
        # fail immediately on the broken socket and propagate the error to the caller
        # via _afc_pending, giving a clean exception rather than an infinite hang.
        if self._afc_reader_task is None or self._afc_reader_task.done():
            self._afc_reader_task = asyncio.create_task(self._afc_reader_loop(), name="afc-reader")
        loop = asyncio.get_event_loop()
        async with self._afc_write_lock:
            fut: asyncio.Future = loop.create_future()
            # Register *before* sending so the reader never misses the response
            pkt_num = self.packet_num
            self._afc_pending[pkt_num] = fut
            try:
                await self._dispatch_packet(operation, data, this_length)
            except BaseException:
                self._afc_pending.pop(pkt_num, None)
                raise
        return await fut

    async def _do_operation(self, opcode: AfcOpcode, data: bytes = b"", filename: Optional[str] = None) -> bytes:
        """
        Performs a low-level operation using the specified opcode and additional data.

        This method dispatches a packet with the given opcode and data, waits for a
        response via the background demux reader, and processes the result to determine
        success or failure. Multiple callers may call this concurrently on the same
        instance; each operation is matched to its response by ``packet_num``.

        :param opcode: The operation code specifying the type of operation to perform.
        :param data: The additional data to send along with the operation. Defaults to an empty byte string.
        :param filename: The filename associated with the operation, if applicable. Defaults to None.

        :returns: bytes: The data received as a response to the operation.

        :raises:
            AfcException: General exception raised if the operation fails with an
                          unspecified error status.
            AfcFileNotFoundError: Exception raised when the operation fails due to
                                  an object not being found (e.g., file or directory).
        """
        status, data = await self._send_and_wait(opcode, data)

        exception = AfcException
        if status != AfcError.SUCCESS:
            if status == AfcError.OBJECT_NOT_FOUND:
                exception = AfcFileNotFoundError

            opcode_name = opcode.name if isinstance(opcode, AfcOpcode) else opcode
            message = f"Opcode: {opcode_name} failed with status: {status}"
            if filename is not None:
                message += f" for file: {filename}"
            raise exception(message, status, filename)

        return data

pull async

pull(relative_src: str, dst: str, match: Optional[Pattern] = None, callback: Optional[Callable] = None, src_dir: str = '', ignore_errors: bool = False, progress_bar: bool = True) -> None

Pull (download) a file or directory from the device to the local machine.

The source is resolved through any symbolic link before transfer. A regular file is written to dst; if dst is an existing local directory, the file is placed inside it under the source's basename. Files larger than MAXIMUM_READ_SIZE (4 MB) are streamed in chunks. The local file's modification time is set to match the device's st_mtime. A directory source is copied recursively into a subdirectory of dst named after the source's basename.

Parameters:

Name Type Description Default
relative_src str

Source path on the device, resolved relative to src_dir.

required
dst str

Destination path on the local machine.

required
match Optional[Pattern]

Optional regex; only entries whose basename matches (via search) are copied. Non-matching files are skipped.

None
callback Optional[Callable]

Optional callable invoked as callback(src, dst) after each regular file is copied. If it returns an awaitable, the awaitable is awaited.

None
src_dir str

Base directory on the device against which relative_src is joined.

''
ignore_errors bool

If True, per-entry errors during directory recursion are logged and skipped instead of propagated.

False
progress_bar bool

If True, display a tqdm progress bar while streaming large files.

True
Source code in pymobiledevice3/services/afc.py
async def pull(
    self,
    relative_src: str,
    dst: str,
    match: Optional[Pattern] = None,
    callback: Optional[Callable] = None,
    src_dir: str = "",
    ignore_errors: bool = False,
    progress_bar: bool = True,
) -> None:
    """
    Pull (download) a file or directory from the device to the local machine.

    The source is resolved through any symbolic link before transfer. A regular file is
    written to ``dst``; if ``dst`` is an existing local directory, the file is placed
    inside it under the source's basename. Files larger than ``MAXIMUM_READ_SIZE`` (4 MB)
    are streamed in chunks. The local file's modification time is set to match the
    device's ``st_mtime``. A directory source is copied recursively into a subdirectory
    of ``dst`` named after the source's basename.

    :param relative_src: Source path on the device, resolved relative to ``src_dir``.
    :param dst: Destination path on the local machine.
    :param match: Optional regex; only entries whose basename matches (via ``search``) are
        copied. Non-matching files are skipped.
    :param callback: Optional callable invoked as ``callback(src, dst)`` after each regular
        file is copied. If it returns an awaitable, the awaitable is awaited.
    :param src_dir: Base directory on the device against which ``relative_src`` is joined.
    :param ignore_errors: If True, per-entry errors during directory recursion are logged
        and skipped instead of propagated.
    :param progress_bar: If True, display a tqdm progress bar while streaming large files.
    """
    src = await self.resolve_path(posixpath.join(src_dir, relative_src))

    if not await self.isdir(src):
        # normal file
        if match is not None and not match.search(posixpath.basename(src)):
            return
        if os.path.isdir(dst):
            dst = os.path.join(dst, os.path.basename(relative_src))
        with open(dst, "wb") as f:
            src_size = (await self.stat(src))["st_size"]
            if src_size <= MAXIMUM_READ_SIZE:
                f.write(await self.get_file_contents(src))
            else:
                handle = await self.fopen(src)
                iterator = range(0, src_size, MAXIMUM_READ_SIZE)
                if progress_bar:
                    iterator = trange(0, src_size, MAXIMUM_READ_SIZE)
                for offset in iterator:
                    to_read = min(MAXIMUM_READ_SIZE, src_size - offset)
                    f.write(await self.fread(handle, to_read))
                await self.fclose(handle)
        os.utime(dst, (os.stat(dst).st_atime, (await self.stat(src))["st_mtime"].timestamp()))
        if callback is not None:
            result = callback(src, dst)
            if inspect.isawaitable(result):
                await result
    else:
        # directory
        dst_path = pathlib.Path(dst) / os.path.basename(relative_src)
        dst_path.mkdir(parents=True, exist_ok=True)

        for filename in await self.listdir(src):
            src_filename = posixpath.join(src, filename)
            dst_filename = dst_path / filename

            src_filename = await self.resolve_path(src_filename)

            if match is not None and not match.search(posixpath.basename(src_filename)):
                continue

            try:
                if await self.isdir(src_filename):
                    dst_filename.mkdir(exist_ok=True)
                    await self.pull(
                        src_filename,
                        str(dst_path),
                        match=match,
                        callback=callback,
                        ignore_errors=ignore_errors,
                        progress_bar=progress_bar,
                    )
                    continue

                await self.pull(
                    src_filename,
                    str(dst_path),
                    match=match,
                    callback=callback,
                    ignore_errors=ignore_errors,
                    progress_bar=progress_bar,
                )

            except Exception as afc_exception:
                if not ignore_errors:
                    raise
                self.logger.warning(f"(Ignoring) Error: {afc_exception} occurred during the copy of {src_filename}")

exists async

exists(filename)

Check whether a path exists on the device.

Implemented by attempting stat; a missing path is reported as False and any other AFC error propagates.

Parameters:

Name Type Description Default
filename

Path to check.

required

Returns:

Type Description

True if the path exists, False if it was not found.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def exists(self, filename):
    """
    Check whether a path exists on the device.

    Implemented by attempting `stat`; a missing path is reported as False and any
    other AFC error propagates.

    :param filename: Path to check.
    :return: True if the path exists, False if it was not found.
    """
    try:
        await self.stat(filename)
    except AfcFileNotFoundError:
        return False
    return True

wait_exists async

wait_exists(filename)

Block until a path exists on the device.

Polls exists every 0.1 seconds until the path appears. Does not return until the path exists.

Parameters:

Name Type Description Default
filename

Path to wait for.

required
Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def wait_exists(self, filename):
    """
    Block until a path exists on the device.

    Polls `exists` every 0.1 seconds until the path appears. Does not return
    until the path exists.

    :param filename: Path to wait for.
    """
    while not await self.exists(filename):
        await asyncio.sleep(0.1)

push async

push(local_path, remote_path, callback=None)

Push (upload) a file or directory from the local machine to the device.

A regular file is written to remote_path (or, if remote_path is an existing remote directory, into it under the file's basename). A local directory is copied recursively into remote_path/, creating remote directories as needed.

Parameters:

Name Type Description Default
local_path

Source path on the local machine.

required
remote_path

Destination path on the device.

required
callback

Optional callable invoked as callback(local_path, remote_path) before each entry is transferred.

None
Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def push(self, local_path, remote_path, callback=None):
    """
    Push (upload) a file or directory from the local machine to the device.

    A regular file is written to ``remote_path`` (or, if ``remote_path`` is an existing
    remote directory, into it under the file's basename). A local directory is copied
    recursively into ``remote_path``/<basename of local_path>, creating remote directories
    as needed.

    :param local_path: Source path on the local machine.
    :param remote_path: Destination path on the device.
    :param callback: Optional callable invoked as ``callback(local_path, remote_path)``
        before each entry is transferred.
    """
    if os.path.isdir(local_path):
        remote_path = posixpath.join(remote_path, os.path.basename(local_path))
    await self._push_internal(local_path, remote_path, callback)

rm_single async

rm_single(filename: str, force: bool = False) -> bool

Remove a single file or empty directory on the device.

Issues a single REMOVE_PATH operation; it does not recurse into a non-empty directory.

Parameters:

Name Type Description Default
filename str

Path to the file or directory to remove.

required
force bool

If True, swallow an AfcException and return False instead of raising.

False

Returns:

Type Description
bool

True on success; False if the removal failed and force is True.

Raises:

Type Description
AfcException

if the removal fails and force is False.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def rm_single(self, filename: str, force: bool = False) -> bool:
    """
    Remove a single file or empty directory on the device.

    Issues a single ``REMOVE_PATH`` operation; it does not recurse into a non-empty
    directory.

    :param filename: Path to the file or directory to remove.
    :param force: If True, swallow an `AfcException` and return False instead of raising.
    :return: True on success; False if the removal failed and ``force`` is True.
    :raises AfcException: if the removal fails and ``force`` is False.
    """
    try:
        await self._do_operation(
            AfcOpcode.REMOVE_PATH, afc_rm_req_t.build(AfcRmRequest(filename=filename)), filename
        )
    except AfcException:
        if force:
            return False
        raise
    return True

rm async

rm(filename: str, match: Optional[Pattern] = None, force: bool = False) -> list[str]

Recursively remove a file or directory tree on the device.

For a directory, every entry is removed depth-first and the directory itself is removed last. The match pattern, when given, filters which top-level entries of filename are considered (matched against their basename via match); nested entries are always removed once their parent is selected.

Parameters:

Name Type Description Default
filename str

Path to the file or directory to remove.

required
match Optional[Pattern]

Optional regex; only directory entries whose basename matches are removed. None removes all entries.

None
force bool

If True, suppress the final AfcException and instead return the list of paths that could not be deleted.

False

Returns:

Type Description
list[str]

List of paths that could not be deleted (empty on full success).

Raises:

Type Description
AfcException

if any path could not be deleted and force is False.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def rm(self, filename: str, match: Optional[Pattern] = None, force: bool = False) -> list[str]:
    """
    Recursively remove a file or directory tree on the device.

    For a directory, every entry is removed depth-first and the directory itself is removed
    last. The ``match`` pattern, when given, filters which top-level entries of ``filename``
    are considered (matched against their basename via ``match``); nested entries are always
    removed once their parent is selected.

    :param filename: Path to the file or directory to remove.
    :param match: Optional regex; only directory entries whose basename matches are removed.
        None removes all entries.
    :param force: If True, suppress the final `AfcException` and instead return the
        list of paths that could not be deleted.
    :return: List of paths that could not be deleted (empty on full success).
    :raises AfcException: if any path could not be deleted and ``force`` is False.
    """
    if not await self.exists(filename) and not await self.rm_single(filename, force=force):
        return [filename]

    # single file
    if not await self.isdir(filename):
        if await self.rm_single(filename, force=force):
            return []
        return [filename]

    # directory content
    undeleted_items = []
    for entry in await self.listdir(filename):
        current_filename = posixpath.join(filename, entry)

        if match is not None and not match.match(posixpath.basename(current_filename)):
            continue

        if await self.isdir(current_filename):
            ret_undeleted_items = await self.rm(current_filename, force=True)
            undeleted_items.extend(ret_undeleted_items)
        else:
            if not await self.rm_single(current_filename, force=True):
                undeleted_items.append(current_filename)

    # directory path
    try:
        if not await self.rm_single(filename, force=force):
            undeleted_items.append(filename)
            return undeleted_items
    except AfcException:
        if undeleted_items:
            undeleted_items.append(filename)
        else:
            raise

    if undeleted_items:
        raise AfcException(f"Failed to delete paths: {undeleted_items}", None)

    return []

get_device_info async

get_device_info()

Query device file-system information.

Issues a GET_DEVINFO operation and decodes the null-terminated key/value list into a dictionary (e.g. model, total/free bytes, block size).

Returns:

Type Description

Dictionary of device file-system attributes (all values are strings).

Source code in pymobiledevice3/services/afc.py
async def get_device_info(self):
    """
    Query device file-system information.

    Issues a ``GET_DEVINFO`` operation and decodes the null-terminated key/value list into
    a dictionary (e.g. model, total/free bytes, block size).

    :return: Dictionary of device file-system attributes (all values are strings).
    """
    return list_to_dict(await self._do_operation(AfcOpcode.GET_DEVINFO))

listdir async

listdir(filename: str)

List the entries of a directory on the device.

Parameters:

Name Type Description Default
filename str

Path to the directory.

required

Returns:

Type Description

Entry names in the directory, excluding . and ...

Raises:

Type Description
AfcFileNotFoundError

if the path does not exist.

AfcException

if the path is not a directory or cannot be listed.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def listdir(self, filename: str):
    """
    List the entries of a directory on the device.

    :param filename: Path to the directory.
    :return: Entry names in the directory, excluding ``.`` and ``..``.
    :raises AfcFileNotFoundError: if the path does not exist.
    :raises AfcException: if the path is not a directory or cannot be listed.
    """
    data = await self._do_operation(
        AfcOpcode.READ_DIR, afc_read_dir_req_t.build(AfcReadDirRequest(filename=filename)), filename
    )
    return afc_read_dir_resp_t.parse(data).filenames[2:]  # skip the . and ..

makedirs async

makedirs(filename: str)

Create a directory on the device.

Issues a MAKE_DIR operation, creating any missing parent directories.

Parameters:

Name Type Description Default
filename str

Path of the directory to create.

required

Returns:

Type Description

Raw response payload from the operation (typically empty).

Raises:

Type Description
AfcException

if the directory could not be created.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def makedirs(self, filename: str):
    """
    Create a directory on the device.

    Issues a ``MAKE_DIR`` operation, creating any missing parent directories.

    :param filename: Path of the directory to create.
    :return: Raw response payload from the operation (typically empty).
    :raises AfcException: if the directory could not be created.
    """
    return await self._do_operation(
        AfcOpcode.MAKE_DIR, afc_mkdir_req_t.build(AfcMkdirRequest(filename=filename)), filename
    )

isdir async

isdir(filename: str) -> bool

Check whether a path is a directory.

Stats the path and tests whether its st_ifmt is S_IFDIR.

Parameters:

Name Type Description Default
filename str

Path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Raises:

Type Description
AfcFileNotFoundError

if the path does not exist.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def isdir(self, filename: str) -> bool:
    """
    Check whether a path is a directory.

    Stats the path and tests whether its ``st_ifmt`` is ``S_IFDIR``.

    :param filename: Path to check.
    :return: True if the path is a directory, False otherwise.
    :raises AfcFileNotFoundError: if the path does not exist.
    """
    stat = await self.stat(filename)
    return stat.get("st_ifmt") == "S_IFDIR"

stat async

stat(filename: str)

Get file or directory metadata.

Issues a GET_FILE_INFO operation and returns the decoded attribute dictionary. Numeric fields (st_size, st_blocks, st_nlink) are converted to ints; st_mtime and st_birthtime are converted from device nanosecond timestamps to datetime. A st_ifmt entry (e.g. S_IFREG, S_IFDIR, S_IFLNK) describes the file type, and symbolic links also carry a LinkTarget.

Parameters:

Name Type Description Default
filename str

Path to the file or directory.

required

Returns:

Type Description

Dictionary of file attributes.

Raises:

Type Description
AfcFileNotFoundError

if the path does not exist (AFC READ_ERROR).

AfcException

for any other failure status.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def stat(self, filename: str):
    """
    Get file or directory metadata.

    Issues a ``GET_FILE_INFO`` operation and returns the decoded attribute dictionary.
    Numeric fields (``st_size``, ``st_blocks``, ``st_nlink``) are converted to ints;
    ``st_mtime`` and ``st_birthtime`` are converted from device nanosecond timestamps to
    `datetime`. A ``st_ifmt`` entry (e.g. ``S_IFREG``, ``S_IFDIR``,
    ``S_IFLNK``) describes the file type, and symbolic links also carry a ``LinkTarget``.

    :param filename: Path to the file or directory.
    :return: Dictionary of file attributes.
    :raises AfcFileNotFoundError: if the path does not exist (AFC ``READ_ERROR``).
    :raises AfcException: for any other failure status.
    """
    try:
        stat = list_to_dict(
            await self._do_operation(
                AfcOpcode.GET_FILE_INFO, afc_stat_t.build(AfcStatRequest(filename=filename)), filename
            )
        )
    except AfcException as e:
        if e.status != AfcError.READ_ERROR:
            raise
        raise AfcFileNotFoundError(e.args[0], e.status) from e

    stat["st_size"] = int(stat["st_size"])
    stat["st_blocks"] = int(stat["st_blocks"])
    stat["st_mtime"] = int(stat["st_mtime"])
    stat["st_birthtime"] = int(stat["st_birthtime"])
    stat["st_nlink"] = int(stat["st_nlink"])
    stat["st_mtime"] = datetime.fromtimestamp(stat["st_mtime"] / (10**9))
    stat["st_birthtime"] = datetime.fromtimestamp(stat["st_birthtime"] / (10**9))
    return stat

os_stat async

os_stat(path: str)

Get file statistics as an stat-style result.

Wraps stat, mapping the AFC st_ifmt to a numeric mode and packing the attributes into a StatResult namedtuple. st_ino is synthesized from the normalized path, st_uid/st_gid/st_dev are reported as 0 and st_blksize as 4096 (AFC does not expose these). st_atime mirrors st_mtime.

Parameters:

Name Type Description Default
path str

Path to the file or directory.

required

Returns:

Type Description

A StatResult namedtuple.

Raises:

Type Description
AfcFileNotFoundError

if the path does not exist.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def os_stat(self, path: str):
    """
    Get file statistics as an `stat`-style result.

    Wraps `stat`, mapping the AFC ``st_ifmt`` to a numeric mode and packing the
    attributes into a `StatResult` namedtuple. ``st_ino`` is synthesized from the
    normalized path, ``st_uid``/``st_gid``/``st_dev`` are reported as 0 and ``st_blksize``
    as 4096 (AFC does not expose these). ``st_atime`` mirrors ``st_mtime``.

    :param path: Path to the file or directory.
    :return: A `StatResult` namedtuple.
    :raises AfcFileNotFoundError: if the path does not exist.
    """
    stat = await self.stat(path)
    mode = 0
    for s_mode in ["S_IFDIR", "S_IFCHR", "S_IFBLK", "S_IFREG", "S_IFIFO", "S_IFLNK", "S_IFSOCK"]:
        if stat["st_ifmt"] == s_mode:
            mode = getattr(stat_module, s_mode)
    return StatResult(
        mode,
        hash(posixpath.normpath(path)),
        0,
        stat["st_nlink"],
        0,
        0,
        stat["st_size"],
        stat["st_mtime"].timestamp(),
        stat["st_mtime"].timestamp(),
        stat["st_birthtime"].timestamp(),
        stat["st_blocks"],
        4096,
        stat["st_birthtime"].timestamp(),
    )
link(target: str, source: str, type_=AfcLinkType.SYMLINK)

Create a symbolic or hard link on the device.

Parameters:

Name Type Description Default
target str

Path the link points to.

required
source str

Path at which the link is created.

required
type_

AfcLinkType to create (SYMLINK by default, or HARDLINK).

SYMLINK

Returns:

Type Description

Raw response payload from the operation (typically empty).

Raises:

Type Description
AfcException

if the link could not be created.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def link(self, target: str, source: str, type_=AfcLinkType.SYMLINK):
    """
    Create a symbolic or hard link on the device.

    :param target: Path the link points to.
    :param source: Path at which the link is created.
    :param type_: `AfcLinkType` to create (``SYMLINK`` by default, or ``HARDLINK``).
    :return: Raw response payload from the operation (typically empty).
    :raises AfcException: if the link could not be created.
    """
    return await self._do_operation(
        AfcOpcode.MAKE_LINK,
        afc_make_link_req_t.build(AfcMakeLinkRequest(type=type_, target=target, source=source)),
    )

fopen async

fopen(filename: str, mode: str = 'r') -> int

Open a file on the device and return its handle.

Parameters:

Name Type Description Default
filename str

Path to the file.

required
mode str

Textual open mode; one of 'r', 'r+', 'w', 'w+', 'a', 'a+' (mapped to the corresponding AFC fopen mode).

'r'

Returns:

Type Description
int

Integer handle usable with fread, fwrite, lock and fclose.

Raises:

Type Description
ArgumentError

if mode is not one of the supported textual modes.

AfcException

if the file could not be opened.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def fopen(self, filename: str, mode: str = "r") -> int:
    """
    Open a file on the device and return its handle.

    :param filename: Path to the file.
    :param mode: Textual open mode; one of ``'r'``, ``'r+'``, ``'w'``, ``'w+'``, ``'a'``,
        ``'a+'`` (mapped to the corresponding AFC fopen mode).
    :return: Integer handle usable with `fread`, `fwrite`, `lock` and
        `fclose`.
    :raises ArgumentError: if ``mode`` is not one of the supported textual modes.
    :raises AfcException: if the file could not be opened.
    """
    if mode not in AFC_FOPEN_TEXTUAL_MODES:
        raise ArgumentError(f"mode can be only one of: {AFC_FOPEN_TEXTUAL_MODES.keys()}")

    data = await self._do_operation(
        AfcOpcode.FILE_OPEN,
        afc_fopen_req_t.build(
            AfcFopenRequest(mode=AFC_FOPEN_TEXTUAL_MODES[mode], filename=filename),
        ),
    )
    return afc_fopen_resp_t.parse(data).handle

fclose async

fclose(handle: int)

Close an open file handle.

Parameters:

Name Type Description Default
handle int

Handle returned by fopen.

required

Returns:

Type Description

Raw response payload from the operation (typically empty).

Raises:

Type Description
AfcException

if the close operation fails.

Source code in pymobiledevice3/services/afc.py
async def fclose(self, handle: int):
    """
    Close an open file handle.

    :param handle: Handle returned by `fopen`.
    :return: Raw response payload from the operation (typically empty).
    :raises AfcException: if the close operation fails.
    """
    return await self._do_operation(AfcOpcode.FILE_CLOSE, afc_fclose_req_t.build(AfcFcloseRequest(handle=handle)))

rename async

rename(source: str, target: str) -> None

Rename or move a file or directory on the device.

Parameters:

Name Type Description Default
source str

Current path of the file or directory.

required
target str

New path for the file or directory.

required

Raises:

Type Description
AfcFileNotFoundError

if the operation fails and source no longer exists.

AfcException

if the operation fails while source still exists.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def rename(self, source: str, target: str) -> None:
    """
    Rename or move a file or directory on the device.

    :param source: Current path of the file or directory.
    :param target: New path for the file or directory.
    :raises AfcFileNotFoundError: if the operation fails and ``source`` no longer exists.
    :raises AfcException: if the operation fails while ``source`` still exists.
    """
    try:
        await self._do_operation(
            AfcOpcode.RENAME_PATH,
            afc_rename_req_t.build(AfcRenameRequest(source=source, target=target)),
        )
    except AfcException as e:
        if await self.exists(source):
            raise
        raise AfcFileNotFoundError(
            f"Failed to rename {source} into {target}. Got status: {e.status}", e.args[0], str(e.status)
        ) from e

fread async

fread(handle: int, sz: int) -> bytes

Read data from an open file handle.

Automatically handles large reads by splitting into multiple operations. Each chunk send+receive is performed atomically via the demux reader so concurrent callers on the same AfcService instance are safe.

Parameters:

Name Type Description Default
handle int

Handle returned by fopen.

required
sz int

Number of bytes to read. Reads larger than MAXIMUM_READ_SIZE (4 MB) are split into successive operations.

required

Returns:

Type Description
bytes

The bytes read.

Raises:

Type Description
AfcException

if any read operation returns a non-success status.

Source code in pymobiledevice3/services/afc.py
async def fread(self, handle: int, sz: int) -> bytes:
    """
    Read data from an open file handle.

    Automatically handles large reads by splitting into multiple operations.
    Each chunk send+receive is performed atomically via the demux reader so
    concurrent callers on the same AfcService instance are safe.

    :param handle: Handle returned by `fopen`.
    :param sz: Number of bytes to read. Reads larger than ``MAXIMUM_READ_SIZE`` (4 MB) are
        split into successive operations.
    :return: The bytes read.
    :raises AfcException: if any read operation returns a non-success status.
    """
    data = b""
    while sz > 0:
        to_read = MAXIMUM_READ_SIZE if sz > MAXIMUM_READ_SIZE else sz
        status, chunk = await self._send_and_wait(
            AfcOpcode.READ, afc_fread_req_t.build(AfcFreadRequest(handle=handle, size=to_read))
        )
        if status != AfcError.SUCCESS:
            raise AfcException("fread error", status)
        sz -= to_read
        data += chunk
    return data

fwrite async

fwrite(handle: int, data: bytes, chunk_size: int = MAXIMUM_WRITE_SIZE) -> None

Write data to an open file handle.

Automatically handles large writes by splitting into multiple operations. Each chunk send+receive is performed atomically via the demux reader so concurrent callers on the same AfcService instance are safe.

Parameters:

Name Type Description Default
handle int

Handle returned by fopen.

required
data bytes

Bytes to write.

required
chunk_size int

Maximum bytes per WRITE operation (default MAXIMUM_WRITE_SIZE).

MAXIMUM_WRITE_SIZE

Raises:

Type Description
AfcException

if any write operation returns a non-success status.

Source code in pymobiledevice3/services/afc.py
async def fwrite(self, handle: int, data: bytes, chunk_size: int = MAXIMUM_WRITE_SIZE) -> None:
    """
    Write data to an open file handle.

    Automatically handles large writes by splitting into multiple operations.
    Each chunk send+receive is performed atomically via the demux reader so
    concurrent callers on the same AfcService instance are safe.

    :param handle: Handle returned by `fopen`.
    :param data: Bytes to write.
    :param chunk_size: Maximum bytes per ``WRITE`` operation (default ``MAXIMUM_WRITE_SIZE``).
    :raises AfcException: if any write operation returns a non-success status.
    """
    file_handle = struct.pack("<Q", handle)
    chunks_count = len(data) // chunk_size
    for i in range(chunks_count):
        chunk = data[i * chunk_size : (i + 1) * chunk_size]
        status, _ = await self._send_and_wait(AfcOpcode.WRITE, file_handle + chunk, this_length=48)
        if status != AfcError.SUCCESS:
            raise AfcException(f"failed to write chunk: {status}", status)

    if len(data) % chunk_size:
        chunk = data[chunks_count * chunk_size :]
        status, _ = await self._send_and_wait(AfcOpcode.WRITE, file_handle + chunk, this_length=48)
        if status != AfcError.SUCCESS:
            raise AfcException(f"failed to write last chunk: {status}", status)

resolve_path async

resolve_path(filename: str)

Resolve a symbolic link to its target path.

Stats the path; if it is a symbolic link, returns its LinkTarget (joined against the link's parent directory when the target is relative). Non-links are returned unchanged. Only a single level of link is resolved.

Parameters:

Name Type Description Default
filename str

Path to resolve.

required

Returns:

Type Description

The link target, or filename unchanged if it is not a symbolic link.

Raises:

Type Description
AfcFileNotFoundError

if the path does not exist.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def resolve_path(self, filename: str):
    """
    Resolve a symbolic link to its target path.

    Stats the path; if it is a symbolic link, returns its ``LinkTarget`` (joined against
    the link's parent directory when the target is relative). Non-links are returned
    unchanged. Only a single level of link is resolved.

    :param filename: Path to resolve.
    :return: The link target, or ``filename`` unchanged if it is not a symbolic link.
    :raises AfcFileNotFoundError: if the path does not exist.
    """
    info = await self.stat(filename)
    if info["st_ifmt"] == "S_IFLNK":
        target = info["LinkTarget"]
        filename = posixpath.join(posixpath.dirname(filename), target) if not target.startswith("/") else target
    return filename

get_file_contents async

get_file_contents(filename: str)

Read and return the entire contents of a file.

Resolves any symbolic link, verifies the target is a regular file, then opens it, reads st_size bytes and closes the handle.

Parameters:

Name Type Description Default
filename str

Path to the file.

required

Returns:

Type Description

The file contents as bytes.

Raises:

Type Description
AfcException

if the path is not a regular file (S_IFREG).

AfcFileNotFoundError

if the path does not exist.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def get_file_contents(self, filename: str):
    """
    Read and return the entire contents of a file.

    Resolves any symbolic link, verifies the target is a regular file, then opens it,
    reads ``st_size`` bytes and closes the handle.

    :param filename: Path to the file.
    :return: The file contents as bytes.
    :raises AfcException: if the path is not a regular file (``S_IFREG``).
    :raises AfcFileNotFoundError: if the path does not exist.
    """
    filename = await self.resolve_path(filename)
    info = await self.stat(filename)

    if info["st_ifmt"] != "S_IFREG":
        raise AfcException(f"{filename} isn't a file", AfcError.INVALID_ARG, filename)

    h = await self.fopen(filename)
    if not h:
        return
    d = await self.fread(h, int(info["st_size"]))
    await self.fclose(h)
    return d

set_file_contents async

set_file_contents(filename: str, data: bytes) -> None

Write data to a file, creating or truncating it.

Opens the file in 'w' mode (create/truncate), writes data and closes the handle.

Parameters:

Name Type Description Default
filename str

Path to the file.

required
data bytes

Bytes to write.

required

Raises:

Type Description
AfcException

if the open or write operation fails.

Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def set_file_contents(self, filename: str, data: bytes) -> None:
    """
    Write data to a file, creating or truncating it.

    Opens the file in ``'w'`` mode (create/truncate), writes ``data`` and closes the handle.

    :param filename: Path to the file.
    :param data: Bytes to write.
    :raises AfcException: if the open or write operation fails.
    """
    h = await self.fopen(filename, "w")
    await self.fwrite(h, data)
    await self.fclose(h)

walk async

walk(dirname: str)

Recursively walk a directory tree, similar to walk.

Yields one (dirpath, dirnames, filenames) tuple per directory, top-down, starting at dirname. dirnames lists subdirectory names and filenames lists all other entries. Symbolic links to directories are classified as files (not descended into).

:yields: (dirpath, dirnames, filenames) tuples.

Parameters:

Name Type Description Default
dirname str

Root directory to walk.

required
Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def walk(self, dirname: str):
    """
    Recursively walk a directory tree, similar to `walk`.

    Yields one ``(dirpath, dirnames, filenames)`` tuple per directory, top-down, starting
    at ``dirname``. ``dirnames`` lists subdirectory names and ``filenames`` lists all other
    entries. Symbolic links to directories are classified as files (not descended into).

    :param dirname: Root directory to walk.
    :yields: ``(dirpath, dirnames, filenames)`` tuples.
    """
    dirs = []
    files = []
    for fd in await self.listdir(dirname):
        if fd in (".", "..", ""):
            continue
        infos = await self.stat(posixpath.join(dirname, fd))
        if infos and infos.get("st_ifmt") == "S_IFDIR":
            dirs.append(fd)
        else:
            files.append(fd)

    yield dirname, dirs, files

    if dirs:
        for d in dirs:
            async for item in self.walk(posixpath.join(dirname, d)):
                yield item

dirlist async

dirlist(root: str, depth: int = -1)

Recursively yield paths under a directory up to a maximum depth.

Yields root itself first, followed by the full path of every entry found while walking, pruning descent once a directory's depth (in path separators) reaches depth.

:yields: Full paths of files and directories.

Parameters:

Name Type Description Default
root str

Root directory to list.

required
depth int

Maximum traversal depth; -1 for unlimited, 0 to yield only root.

-1
Source code in pymobiledevice3/services/afc.py
@path_to_str()
async def dirlist(self, root: str, depth: int = -1):
    """
    Recursively yield paths under a directory up to a maximum depth.

    Yields ``root`` itself first, followed by the full path of every entry found while
    walking, pruning descent once a directory's depth (in path separators) reaches
    ``depth``.

    :param root: Root directory to list.
    :param depth: Maximum traversal depth; ``-1`` for unlimited, ``0`` to yield only ``root``.
    :yields: Full paths of files and directories.
    """
    async for folder, dirs, files in self.walk(root):
        if folder == root:
            yield folder
            if depth == 0:
                break
        if folder != root and depth != -1 and folder.count(posixpath.sep) >= depth:
            continue
        for entry in dirs + files:
            yield posixpath.join(folder, entry)

lock async

lock(handle, operation)

Apply or release an advisory flock-style lock on an open file.

Parameters:

Name Type Description Default
handle

Handle returned by fopen.

required
operation

Lock operation, one of AFC_LOCK_SH (shared), AFC_LOCK_EX (exclusive) or AFC_LOCK_UN (unlock).

required

Returns:

Type Description

Raw response payload from the operation (typically empty).

Raises:

Type Description
AfcException

if the lock operation fails.

Source code in pymobiledevice3/services/afc.py
async def lock(self, handle, operation):
    """
    Apply or release an advisory ``flock``-style lock on an open file.

    :param handle: Handle returned by `fopen`.
    :param operation: Lock operation, one of ``AFC_LOCK_SH`` (shared), ``AFC_LOCK_EX``
        (exclusive) or ``AFC_LOCK_UN`` (unlock).
    :return: Raw response payload from the operation (typically empty).
    :raises AfcException: if the lock operation fails.
    """
    return await self._do_operation(
        AfcOpcode.FILE_LOCK, afc_lock_t.build(AfcLockRequest(handle=handle, op=operation))
    )

pymobiledevice3.services.diagnostics.DiagnosticsService

Bases: LockdownService

Client for the device's diagnostics_relay lockdown service.

Exposes the diagnostics relay over a plist request/response channel, allowing callers to:

  • Query MobileGestalt and IORegistry keys.
  • Read diagnostic information reports (e.g. battery, Wi-Fi, generic diagnostics).
  • Restart, shut down or put the device to sleep.
Source code in pymobiledevice3/services/diagnostics.py
class DiagnosticsService(LockdownService):
    """
    Client for the device's ``diagnostics_relay`` lockdown service.

    Exposes the diagnostics relay over a plist request/response channel, allowing callers to:

    * Query MobileGestalt and IORegistry keys.
    * Read diagnostic information reports (e.g. battery, Wi-Fi, generic diagnostics).
    * Restart, shut down or put the device to sleep.
    """

    RSD_SERVICE_NAME = "com.apple.mobile.diagnostics_relay.shim.remote"
    SERVICE_NAME = "com.apple.mobile.diagnostics_relay"
    OLD_SERVICE_NAME = "com.apple.iosdiagnostics.relay"

    def __init__(self, lockdown: LockdownServiceProvider):
        """
        :param lockdown: Lockdown service provider used to establish the diagnostics relay connection. When it is a
            `LockdownClient`, the classic lockdown service name is used; otherwise the
            RemoteXPC (RSD) service name is used.
        """
        service_name = self.SERVICE_NAME if isinstance(lockdown, LockdownClient) else self.RSD_SERVICE_NAME
        super().__init__(lockdown, service_name, service=None)

    async def _send_recv(self, request: dict) -> dict:
        return await self.service.send_recv_plist(request)

    async def mobilegestalt(self, keys: Optional[list[str]] = None) -> dict:
        """
        Query MobileGestalt values from the device.

        Sends a ``MobileGestalt`` request and returns the resolved key/value mapping (with the internal ``Status``
        entry stripped out).

        :param keys: MobileGestalt keys to query. If ``None`` or empty, the full built-in `MobileGestaltKeys`
            list is queried.
        :returns: Mapping of each requested MobileGestalt key to its value.
        :raises DeprecationError: If MobileGestalt is reported as deprecated (iOS >= 17.4).
        :raises PyMobileDevice3Exception: If the request or the MobileGestalt query did not complete successfully.
        """
        if keys is None or len(keys) == 0:
            keys = MobileGestaltKeys
        response = await self._send_recv({"Request": "MobileGestalt", "MobileGestaltKeys": keys})

        if response["Diagnostics"]["MobileGestalt"]["Status"] == "MobileGestaltDeprecated":
            err_msg = f"failed to query MobileGestalt, MobileGestalt deprecated (iOS >= 17.4). Got response {response}"
            raise DeprecationError(err_msg)

        if (response["Status"] != "Success") or (response["Diagnostics"]["MobileGestalt"]["Status"] != "Success"):
            raise PyMobileDevice3Exception("failed to query MobileGestalt")

        response["Diagnostics"]["MobileGestalt"].pop("Status")

        return response["Diagnostics"]["MobileGestalt"]

    async def action(self, action: str) -> Optional[dict]:
        """
        Send a single diagnostics relay request and return its ``Diagnostics`` payload.

        This is the low-level primitive used by the action/info helpers below. The given string is sent verbatim as
        the request's ``Request`` field.

        :param action: Request name to send (e.g. ``Restart``, ``Shutdown``, ``Sleep``, ``All``).
        :returns: The response's ``Diagnostics`` payload, or ``None`` when the response carries no such payload.
        :raises PyMobileDevice3Exception: If the response status is not ``Success``.
        """
        response = await self._send_recv({"Request": action})
        if response["Status"] != "Success":
            raise PyMobileDevice3Exception(f"failed to perform action: {action}")
        return response.get("Diagnostics")

    async def restart(self):
        """
        Restart the device by sending the ``Restart`` request.

        :raises PyMobileDevice3Exception: If the request did not complete successfully.
        """
        await self.action("Restart")

    async def shutdown(self):
        """
        Shut the device down by sending the ``Shutdown`` request.

        :raises PyMobileDevice3Exception: If the request did not complete successfully.
        """
        await self.action("Shutdown")

    async def sleep(self):
        """
        Put the device to sleep by sending the ``Sleep`` request.

        :raises PyMobileDevice3Exception: If the request did not complete successfully.
        """
        await self.action("Sleep")

    async def info(self, diag_type: str = "All") -> dict:
        """
        Fetch a diagnostics information report from the device.

        :param diag_type: Diagnostics report type to request; sent as the ``Request`` field. Defaults to ``All``.
        :returns: The ``Diagnostics`` payload of the response.
        :raises PyMobileDevice3Exception: If the request did not complete successfully.
        """
        return await self.action(diag_type)

    async def ioregistry(self, plane: Optional[str] = None, name: Optional[str] = None, ioclass: Optional[str] = None):
        """
        Query the device's IORegistry.

        Sends an ``IORegistry`` request, optionally narrowed by plane, entry name and/or entry class.

        :param plane: IORegistry plane to traverse, sent as ``CurrentPlane`` (e.g. ``IODeviceTree``,
            ``IOService``). Omitted when falsy.
        :param name: Name of the registry entry to look up, sent as ``EntryName``. Omitted when falsy.
        :param ioclass: Class of the registry entry to look up, sent as ``EntryClass``. Omitted when falsy.
        :returns: The ``IORegistry`` portion of the response's ``Diagnostics`` payload, or ``None`` when the response
            carries no such payload.
        :raises PyMobileDevice3Exception: If the response status is not ``Success``.
        """
        d = {}

        if plane:
            d["CurrentPlane"] = plane

        if name:
            d["EntryName"] = name

        if ioclass:
            d["EntryClass"] = ioclass

        d["Request"] = "IORegistry"

        response = await self._send_recv(d)
        if response.get("Status") != "Success":
            raise PyMobileDevice3Exception(f"got invalid response: {response}")

        dd = response.get("Diagnostics")
        if dd:
            return dd.get("IORegistry")
        return None

    async def get_battery(self) -> dict:
        """
        Fetch battery/power-source information from the IORegistry.

        Convenience wrapper that queries the IORegistry entry of class ``IOPMPowerSource``.

        :returns: The IORegistry data for the power-source entry, or ``None`` if it is absent.
        :raises PyMobileDevice3Exception: If the underlying IORegistry request did not complete successfully.
        """
        return await self.ioregistry(ioclass="IOPMPowerSource")

    async def get_wifi(self) -> dict:
        """
        Fetch the Wi-Fi interface information from the IORegistry.

        First attempts the entry named ``AppleBCMWLANSkywalkInterface``; if that yields no result, falls back to the
        entry of class ``IO80211Interface``.

        :returns: The IORegistry data for the Wi-Fi interface, or ``None`` if neither lookup matches.
        :raises PyMobileDevice3Exception: If an underlying IORegistry request did not complete successfully.
        """
        result = await self.ioregistry(name="AppleBCMWLANSkywalkInterface")
        if result:
            return result
        return await self.ioregistry(ioclass="IO80211Interface")

mobilegestalt async

mobilegestalt(keys: Optional[list[str]] = None) -> dict

Query MobileGestalt values from the device.

Sends a MobileGestalt request and returns the resolved key/value mapping (with the internal Status entry stripped out).

Parameters:

Name Type Description Default
keys Optional[list[str]]

MobileGestalt keys to query. If None or empty, the full built-in MobileGestaltKeys list is queried.

None

Returns:

Type Description
dict

Mapping of each requested MobileGestalt key to its value.

Raises:

Type Description
DeprecationError

If MobileGestalt is reported as deprecated (iOS >= 17.4).

PyMobileDevice3Exception

If the request or the MobileGestalt query did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def mobilegestalt(self, keys: Optional[list[str]] = None) -> dict:
    """
    Query MobileGestalt values from the device.

    Sends a ``MobileGestalt`` request and returns the resolved key/value mapping (with the internal ``Status``
    entry stripped out).

    :param keys: MobileGestalt keys to query. If ``None`` or empty, the full built-in `MobileGestaltKeys`
        list is queried.
    :returns: Mapping of each requested MobileGestalt key to its value.
    :raises DeprecationError: If MobileGestalt is reported as deprecated (iOS >= 17.4).
    :raises PyMobileDevice3Exception: If the request or the MobileGestalt query did not complete successfully.
    """
    if keys is None or len(keys) == 0:
        keys = MobileGestaltKeys
    response = await self._send_recv({"Request": "MobileGestalt", "MobileGestaltKeys": keys})

    if response["Diagnostics"]["MobileGestalt"]["Status"] == "MobileGestaltDeprecated":
        err_msg = f"failed to query MobileGestalt, MobileGestalt deprecated (iOS >= 17.4). Got response {response}"
        raise DeprecationError(err_msg)

    if (response["Status"] != "Success") or (response["Diagnostics"]["MobileGestalt"]["Status"] != "Success"):
        raise PyMobileDevice3Exception("failed to query MobileGestalt")

    response["Diagnostics"]["MobileGestalt"].pop("Status")

    return response["Diagnostics"]["MobileGestalt"]

action async

action(action: str) -> Optional[dict]

Send a single diagnostics relay request and return its Diagnostics payload.

This is the low-level primitive used by the action/info helpers below. The given string is sent verbatim as the request's Request field.

Parameters:

Name Type Description Default
action str

Request name to send (e.g. Restart, Shutdown, Sleep, All).

required

Returns:

Type Description
Optional[dict]

The response's Diagnostics payload, or None when the response carries no such payload.

Raises:

Type Description
PyMobileDevice3Exception

If the response status is not Success.

Source code in pymobiledevice3/services/diagnostics.py
async def action(self, action: str) -> Optional[dict]:
    """
    Send a single diagnostics relay request and return its ``Diagnostics`` payload.

    This is the low-level primitive used by the action/info helpers below. The given string is sent verbatim as
    the request's ``Request`` field.

    :param action: Request name to send (e.g. ``Restart``, ``Shutdown``, ``Sleep``, ``All``).
    :returns: The response's ``Diagnostics`` payload, or ``None`` when the response carries no such payload.
    :raises PyMobileDevice3Exception: If the response status is not ``Success``.
    """
    response = await self._send_recv({"Request": action})
    if response["Status"] != "Success":
        raise PyMobileDevice3Exception(f"failed to perform action: {action}")
    return response.get("Diagnostics")

restart async

restart()

Restart the device by sending the Restart request.

Raises:

Type Description
PyMobileDevice3Exception

If the request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def restart(self):
    """
    Restart the device by sending the ``Restart`` request.

    :raises PyMobileDevice3Exception: If the request did not complete successfully.
    """
    await self.action("Restart")

shutdown async

shutdown()

Shut the device down by sending the Shutdown request.

Raises:

Type Description
PyMobileDevice3Exception

If the request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def shutdown(self):
    """
    Shut the device down by sending the ``Shutdown`` request.

    :raises PyMobileDevice3Exception: If the request did not complete successfully.
    """
    await self.action("Shutdown")

sleep async

sleep()

Put the device to sleep by sending the Sleep request.

Raises:

Type Description
PyMobileDevice3Exception

If the request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def sleep(self):
    """
    Put the device to sleep by sending the ``Sleep`` request.

    :raises PyMobileDevice3Exception: If the request did not complete successfully.
    """
    await self.action("Sleep")

info async

info(diag_type: str = 'All') -> dict

Fetch a diagnostics information report from the device.

Parameters:

Name Type Description Default
diag_type str

Diagnostics report type to request; sent as the Request field. Defaults to All.

'All'

Returns:

Type Description
dict

The Diagnostics payload of the response.

Raises:

Type Description
PyMobileDevice3Exception

If the request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def info(self, diag_type: str = "All") -> dict:
    """
    Fetch a diagnostics information report from the device.

    :param diag_type: Diagnostics report type to request; sent as the ``Request`` field. Defaults to ``All``.
    :returns: The ``Diagnostics`` payload of the response.
    :raises PyMobileDevice3Exception: If the request did not complete successfully.
    """
    return await self.action(diag_type)

ioregistry async

ioregistry(plane: Optional[str] = None, name: Optional[str] = None, ioclass: Optional[str] = None)

Query the device's IORegistry.

Sends an IORegistry request, optionally narrowed by plane, entry name and/or entry class.

Parameters:

Name Type Description Default
plane Optional[str]

IORegistry plane to traverse, sent as CurrentPlane (e.g. IODeviceTree, IOService). Omitted when falsy.

None
name Optional[str]

Name of the registry entry to look up, sent as EntryName. Omitted when falsy.

None
ioclass Optional[str]

Class of the registry entry to look up, sent as EntryClass. Omitted when falsy.

None

Returns:

Type Description

The IORegistry portion of the response's Diagnostics payload, or None when the response carries no such payload.

Raises:

Type Description
PyMobileDevice3Exception

If the response status is not Success.

Source code in pymobiledevice3/services/diagnostics.py
async def ioregistry(self, plane: Optional[str] = None, name: Optional[str] = None, ioclass: Optional[str] = None):
    """
    Query the device's IORegistry.

    Sends an ``IORegistry`` request, optionally narrowed by plane, entry name and/or entry class.

    :param plane: IORegistry plane to traverse, sent as ``CurrentPlane`` (e.g. ``IODeviceTree``,
        ``IOService``). Omitted when falsy.
    :param name: Name of the registry entry to look up, sent as ``EntryName``. Omitted when falsy.
    :param ioclass: Class of the registry entry to look up, sent as ``EntryClass``. Omitted when falsy.
    :returns: The ``IORegistry`` portion of the response's ``Diagnostics`` payload, or ``None`` when the response
        carries no such payload.
    :raises PyMobileDevice3Exception: If the response status is not ``Success``.
    """
    d = {}

    if plane:
        d["CurrentPlane"] = plane

    if name:
        d["EntryName"] = name

    if ioclass:
        d["EntryClass"] = ioclass

    d["Request"] = "IORegistry"

    response = await self._send_recv(d)
    if response.get("Status") != "Success":
        raise PyMobileDevice3Exception(f"got invalid response: {response}")

    dd = response.get("Diagnostics")
    if dd:
        return dd.get("IORegistry")
    return None

get_battery async

get_battery() -> dict

Fetch battery/power-source information from the IORegistry.

Convenience wrapper that queries the IORegistry entry of class IOPMPowerSource.

Returns:

Type Description
dict

The IORegistry data for the power-source entry, or None if it is absent.

Raises:

Type Description
PyMobileDevice3Exception

If the underlying IORegistry request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def get_battery(self) -> dict:
    """
    Fetch battery/power-source information from the IORegistry.

    Convenience wrapper that queries the IORegistry entry of class ``IOPMPowerSource``.

    :returns: The IORegistry data for the power-source entry, or ``None`` if it is absent.
    :raises PyMobileDevice3Exception: If the underlying IORegistry request did not complete successfully.
    """
    return await self.ioregistry(ioclass="IOPMPowerSource")

get_wifi async

get_wifi() -> dict

Fetch the Wi-Fi interface information from the IORegistry.

First attempts the entry named AppleBCMWLANSkywalkInterface; if that yields no result, falls back to the entry of class IO80211Interface.

Returns:

Type Description
dict

The IORegistry data for the Wi-Fi interface, or None if neither lookup matches.

Raises:

Type Description
PyMobileDevice3Exception

If an underlying IORegistry request did not complete successfully.

Source code in pymobiledevice3/services/diagnostics.py
async def get_wifi(self) -> dict:
    """
    Fetch the Wi-Fi interface information from the IORegistry.

    First attempts the entry named ``AppleBCMWLANSkywalkInterface``; if that yields no result, falls back to the
    entry of class ``IO80211Interface``.

    :returns: The IORegistry data for the Wi-Fi interface, or ``None`` if neither lookup matches.
    :raises PyMobileDevice3Exception: If an underlying IORegistry request did not complete successfully.
    """
    result = await self.ioregistry(name="AppleBCMWLANSkywalkInterface")
    if result:
        return result
    return await self.ioregistry(ioclass="IO80211Interface")

pymobiledevice3.services.springboard.SpringBoardServicesService

Bases: LockdownService

Client for the com.apple.springboardservices lockdown service.

Exposes SpringBoard operations such as reading and writing the home screen icon layout, fetching application icon and wallpaper images, and querying the interface orientation.

Source code in pymobiledevice3/services/springboard.py
class SpringBoardServicesService(LockdownService):
    """
    Client for the ``com.apple.springboardservices`` lockdown service.

    Exposes SpringBoard operations such as reading and writing the home screen icon layout,
    fetching application icon and wallpaper images, and querying the interface orientation.
    """

    RSD_SERVICE_NAME = "com.apple.springboardservices.shim.remote"
    SERVICE_NAME = "com.apple.springboardservices"

    def __init__(self, lockdown: LockdownServiceProvider) -> None:
        if isinstance(lockdown, LockdownClient):
            super().__init__(lockdown, self.SERVICE_NAME)
        else:
            super().__init__(lockdown, self.RSD_SERVICE_NAME)

    async def get_icon_state(self, format_version: str = "2") -> list:
        """
        Retrieve the current home screen icon layout.

        :param format_version: Icon state format version sent to SpringBoard as ``formatVersion``.
            When falsy, the key is omitted from the request.
        :returns: Nested list describing the home screen pages, folders and icons.
        """
        cmd = {"command": "getIconState"}
        if format_version:
            cmd["formatVersion"] = format_version
        return await self.service.send_recv_plist(cmd)

    async def set_icon_state(self, newstate: Optional[list] = None) -> None:
        """
        Apply a new home screen icon layout.

        :param newstate: Icon layout in the same structure returned by `get_icon_state`.
            When ``None``, an empty layout is sent.
        """
        if newstate is None:
            newstate = {}
        await self.service.send_recv_prefixed(build_plist({"command": "setIconState", "iconState": newstate}))

    async def get_icon_pngdata(self, bundle_id: str) -> bytes:
        """
        Retrieve the home screen icon image of an installed application.

        :param bundle_id: Bundle identifier of the application whose icon is requested.
        :returns: PNG-encoded icon image bytes, or ``None`` if no ``pngData`` is returned.
        """
        return (await self.service.send_recv_plist({"command": "getIconPNGData", "bundleId": bundle_id})).get("pngData")

    async def get_interface_orientation(self) -> InterfaceOrientation:
        """
        Query the current SpringBoard interface orientation.

        :returns: The current orientation as an `InterfaceOrientation` enum value.
        """
        res = await self.service.send_recv_plist({"command": "getInterfaceOrientation"})
        return InterfaceOrientation(res.get("interfaceOrientation"))

    async def get_wallpaper_pngdata(self) -> bytes:
        """
        Retrieve the current home screen wallpaper image.

        :returns: PNG-encoded wallpaper image bytes, or ``None`` if no ``pngData`` is returned.
        """
        return (await self.service.send_recv_plist({"command": "getHomeScreenWallpaperPNGData"})).get("pngData")

    async def get_homescreen_icon_metrics(self) -> dict[str, float]:
        """
        Retrieve the home screen icon layout metrics.

        :returns: Mapping of metric names to their numeric values, as reported by SpringBoard.
        """
        return await self.service.send_recv_plist({"command": "getHomeScreenIconMetrics"})

    async def get_wallpaper_info(self, wallpaper_name: str) -> dict:
        """
        Retrieve metadata about a named wallpaper.

        :param wallpaper_name: Name of the wallpaper to query.
        :returns: Mapping describing the wallpaper, as reported by SpringBoard.
        """
        return await self.service.send_recv_plist({"command": "getWallpaperInfo", "wallpaperName": wallpaper_name})

    async def reload_icon_state(self) -> None:
        """
        Re-apply the current icon layout by reading it back and writing it unchanged.

        Fetches the current icon state via `get_icon_state` and immediately sends it back
        through `set_icon_state`, forcing SpringBoard to reload its layout.
        """
        await self.set_icon_state(await self.get_icon_state())

    async def get_wallpaper_preview_image(self, wallpaper_name: str) -> bytes:
        """
        Retrieve the preview image for a named wallpaper.

        :param wallpaper_name: Name of the wallpaper whose preview image is requested.
        :returns: PNG-encoded preview image bytes.
        """
        return (
            await self.service.send_recv_plist({"command": "getWallpaperPreviewImage", "wallpaperName": wallpaper_name})
        )["pngData"]

get_icon_state async

get_icon_state(format_version: str = '2') -> list

Retrieve the current home screen icon layout.

Parameters:

Name Type Description Default
format_version str

Icon state format version sent to SpringBoard as formatVersion. When falsy, the key is omitted from the request.

'2'

Returns:

Type Description
list

Nested list describing the home screen pages, folders and icons.

Source code in pymobiledevice3/services/springboard.py
async def get_icon_state(self, format_version: str = "2") -> list:
    """
    Retrieve the current home screen icon layout.

    :param format_version: Icon state format version sent to SpringBoard as ``formatVersion``.
        When falsy, the key is omitted from the request.
    :returns: Nested list describing the home screen pages, folders and icons.
    """
    cmd = {"command": "getIconState"}
    if format_version:
        cmd["formatVersion"] = format_version
    return await self.service.send_recv_plist(cmd)

set_icon_state async

set_icon_state(newstate: Optional[list] = None) -> None

Apply a new home screen icon layout.

Parameters:

Name Type Description Default
newstate Optional[list]

Icon layout in the same structure returned by get_icon_state. When None, an empty layout is sent.

None
Source code in pymobiledevice3/services/springboard.py
async def set_icon_state(self, newstate: Optional[list] = None) -> None:
    """
    Apply a new home screen icon layout.

    :param newstate: Icon layout in the same structure returned by `get_icon_state`.
        When ``None``, an empty layout is sent.
    """
    if newstate is None:
        newstate = {}
    await self.service.send_recv_prefixed(build_plist({"command": "setIconState", "iconState": newstate}))

get_icon_pngdata async

get_icon_pngdata(bundle_id: str) -> bytes

Retrieve the home screen icon image of an installed application.

Parameters:

Name Type Description Default
bundle_id str

Bundle identifier of the application whose icon is requested.

required

Returns:

Type Description
bytes

PNG-encoded icon image bytes, or None if no pngData is returned.

Source code in pymobiledevice3/services/springboard.py
async def get_icon_pngdata(self, bundle_id: str) -> bytes:
    """
    Retrieve the home screen icon image of an installed application.

    :param bundle_id: Bundle identifier of the application whose icon is requested.
    :returns: PNG-encoded icon image bytes, or ``None`` if no ``pngData`` is returned.
    """
    return (await self.service.send_recv_plist({"command": "getIconPNGData", "bundleId": bundle_id})).get("pngData")

get_interface_orientation async

get_interface_orientation() -> InterfaceOrientation

Query the current SpringBoard interface orientation.

Returns:

Type Description
InterfaceOrientation

The current orientation as an InterfaceOrientation enum value.

Source code in pymobiledevice3/services/springboard.py
async def get_interface_orientation(self) -> InterfaceOrientation:
    """
    Query the current SpringBoard interface orientation.

    :returns: The current orientation as an `InterfaceOrientation` enum value.
    """
    res = await self.service.send_recv_plist({"command": "getInterfaceOrientation"})
    return InterfaceOrientation(res.get("interfaceOrientation"))

get_wallpaper_pngdata async

get_wallpaper_pngdata() -> bytes

Retrieve the current home screen wallpaper image.

Returns:

Type Description
bytes

PNG-encoded wallpaper image bytes, or None if no pngData is returned.

Source code in pymobiledevice3/services/springboard.py
async def get_wallpaper_pngdata(self) -> bytes:
    """
    Retrieve the current home screen wallpaper image.

    :returns: PNG-encoded wallpaper image bytes, or ``None`` if no ``pngData`` is returned.
    """
    return (await self.service.send_recv_plist({"command": "getHomeScreenWallpaperPNGData"})).get("pngData")

get_homescreen_icon_metrics async

get_homescreen_icon_metrics() -> dict[str, float]

Retrieve the home screen icon layout metrics.

Returns:

Type Description
dict[str, float]

Mapping of metric names to their numeric values, as reported by SpringBoard.

Source code in pymobiledevice3/services/springboard.py
async def get_homescreen_icon_metrics(self) -> dict[str, float]:
    """
    Retrieve the home screen icon layout metrics.

    :returns: Mapping of metric names to their numeric values, as reported by SpringBoard.
    """
    return await self.service.send_recv_plist({"command": "getHomeScreenIconMetrics"})

get_wallpaper_info async

get_wallpaper_info(wallpaper_name: str) -> dict

Retrieve metadata about a named wallpaper.

Parameters:

Name Type Description Default
wallpaper_name str

Name of the wallpaper to query.

required

Returns:

Type Description
dict

Mapping describing the wallpaper, as reported by SpringBoard.

Source code in pymobiledevice3/services/springboard.py
async def get_wallpaper_info(self, wallpaper_name: str) -> dict:
    """
    Retrieve metadata about a named wallpaper.

    :param wallpaper_name: Name of the wallpaper to query.
    :returns: Mapping describing the wallpaper, as reported by SpringBoard.
    """
    return await self.service.send_recv_plist({"command": "getWallpaperInfo", "wallpaperName": wallpaper_name})

reload_icon_state async

reload_icon_state() -> None

Re-apply the current icon layout by reading it back and writing it unchanged.

Fetches the current icon state via get_icon_state and immediately sends it back through set_icon_state, forcing SpringBoard to reload its layout.

Source code in pymobiledevice3/services/springboard.py
async def reload_icon_state(self) -> None:
    """
    Re-apply the current icon layout by reading it back and writing it unchanged.

    Fetches the current icon state via `get_icon_state` and immediately sends it back
    through `set_icon_state`, forcing SpringBoard to reload its layout.
    """
    await self.set_icon_state(await self.get_icon_state())

get_wallpaper_preview_image async

get_wallpaper_preview_image(wallpaper_name: str) -> bytes

Retrieve the preview image for a named wallpaper.

Parameters:

Name Type Description Default
wallpaper_name str

Name of the wallpaper whose preview image is requested.

required

Returns:

Type Description
bytes

PNG-encoded preview image bytes.

Source code in pymobiledevice3/services/springboard.py
async def get_wallpaper_preview_image(self, wallpaper_name: str) -> bytes:
    """
    Retrieve the preview image for a named wallpaper.

    :param wallpaper_name: Name of the wallpaper whose preview image is requested.
    :returns: PNG-encoded preview image bytes.
    """
    return (
        await self.service.send_recv_plist({"command": "getWallpaperPreviewImage", "wallpaperName": wallpaper_name})
    )["pngData"]

pymobiledevice3.services.crash_reports.CrashReportsManager

Manage crash reports stored on a device.

Wraps the com.apple.crashreportcopymobile AFC service (for listing, reading, pulling and erasing reports) and the com.apple.crashreportmover service (for flushing pending reports). Must be used as an async context manager (async with); the synchronous context manager protocol is intentionally disabled.

Source code in pymobiledevice3/services/crash_reports.py
class CrashReportsManager:
    """
    Manage crash reports stored on a device.

    Wraps the ``com.apple.crashreportcopymobile`` AFC service (for listing, reading, pulling and
    erasing reports) and the ``com.apple.crashreportmover`` service (for flushing pending reports).
    Must be used as an async context manager (``async with``); the synchronous context manager
    protocol is intentionally disabled.
    """

    COPY_MOBILE_NAME = "com.apple.crashreportcopymobile"
    RSD_COPY_MOBILE_NAME = "com.apple.crashreportcopymobile.shim.remote"

    CRASH_MOVER_NAME = "com.apple.crashreportmover"
    RSD_CRASH_MOVER_NAME = "com.apple.crashreportmover.shim.remote"

    APPSTORED_PATH = "/com.apple.appstored"
    IN_PROGRESS_SYSDIAGNOSE_EXTENSIONS: ClassVar = [".tmp", ".tar.gz"]

    def __init__(self, lockdown: LockdownServiceProvider) -> None:
        self.logger = logging.getLogger(__name__)
        self.lockdown = lockdown

        if isinstance(lockdown, LockdownClient):
            self.copy_mobile_service_name = self.COPY_MOBILE_NAME
            self.crash_mover_service_name = self.CRASH_MOVER_NAME
        else:
            self.copy_mobile_service_name = self.RSD_COPY_MOBILE_NAME
            self.crash_mover_service_name = self.RSD_CRASH_MOVER_NAME

        self.afc = AfcService(lockdown, service_name=self.copy_mobile_service_name)

    def __enter__(self):
        raise RuntimeError("Use async context manager: `async with ...`")

    async def __aenter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        raise RuntimeError("Use async context manager: `async with ...`")

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()

    async def aclose(self) -> None:
        """Close the underlying AFC connection."""
        await self.afc.aclose()

    async def clear(self, path: str = "/") -> None:
        """
        Delete all crash reports under a path.

        Lists the immediate children of ``path`` and removes each recursively. A
        ``com.apple.appstored`` entry that the device may recreate immediately after deletion is
        tolerated; any other item that could not be deleted is treated as an error.

        :param path: Path under the crash reports directory to clear. Defaults to '/'.
        :raises AfcException: If any item other than ``com.apple.appstored`` could not be deleted.
        """
        undeleted_items = []
        for filename in await self.ls(path, depth=1):
            undeleted_items.extend(await self.afc.rm(filename, force=True))

        for item in undeleted_items:
            # special case of file that sometimes created automatically right after delete,
            # and then we can't delete the folder because it's not empty
            if item != self.APPSTORED_PATH:
                raise AfcException(
                    f"failed to clear crash reports under {path!r}, undeleted items: {undeleted_items}", None
                )

    async def ls(self, path: str = "/", depth: int = 1) -> list[str]:
        """
        List file and folder in the crash report's directory.
        :param path: Path to list, relative to the crash report's directory.
        :param depth: Listing depth, -1 to list infinite.
        :return: List of files listed.
        """
        result = []
        async for item in self.afc.dirlist(path, depth):
            result.append(item)
        return result[1:]  # skip the root path '/'

    async def parse(self, path: str = "/") -> CrashReportBase:
        """
        Parse a crash report file and return the parsed crash report object.

        :param path: Path to a crash report file.
        :return: Parsed crash report object.
        """
        return get_crash_report_from_buf((await self.afc.get_file_contents(path)).decode(), filename=path)

    async def parse_latest(
        self,
        path: str = "/",
        match: Optional[list[str]] = None,
        match_insensitive: Optional[list[str]] = None,
        count: int = 1,
    ) -> list[CrashReportBase]:
        """
        Parse latest top-level crash report(s) under a path, optionally filtered by basename regex patterns.

        Scans the top level of the given path, filters regular files by basename,
        and returns matches sorted by last modification time (newest first).

        :param path: Path whose top-level entries should be considered. Defaults to '/'
        :param match: Case-sensitive regex patterns over report basename.
        :param match_insensitive: Case-insensitive regex patterns over report basename.
        :param count: Maximum number of latest matching reports to parse.
        :return: Parsed crash report objects ordered from newest to oldest.
                 Result length is between 1 and count.
        :raises ValueError: If count < 1 or if no reports match the filters.

        All provided patterns must match.
        """

        def get_match_arguments_description():
            return ", ".join(
                f"{name}={value!r}"
                for name, value in (("match", match_patterns), ("match_insensitive", match_insensitive_patterns))
                if value
            )

        if count < 1:
            raise ValueError("count must be >= 1")

        match_patterns = match or []
        match_insensitive_patterns = match_insensitive or []

        patterns = [re.compile(pattern) for pattern in match_patterns]
        patterns.extend(re.compile(pattern, re.IGNORECASE) for pattern in match_insensitive_patterns)

        matching_entries_by_mtime = sorted(
            [
                (stat["st_mtime"].timestamp(), entry)
                async for entry in self.afc.dirlist(path, depth=1)
                if entry != path
                and (stat := await self.afc.stat(entry)).get("st_ifmt") == "S_IFREG"
                and (not patterns or all(pattern.search(posixpath.basename(entry)) for pattern in patterns))
            ],
            reverse=True,
        )

        if len(matching_entries_by_mtime) == 0:
            match_arguments_description = get_match_arguments_description()
            raise ValueError(
                f"No reports found ({match_arguments_description})"
                if match_arguments_description
                else "No reports found"
            )

        return [await self.parse(report_path) for _, report_path in matching_entries_by_mtime[:count]]

    async def pull(
        self, out: str, entry: str = "/", erase: bool = False, match: Optional[str] = None, progress_bar: bool = True
    ) -> None:
        """
        Pull crash reports from the device.
        :param out: Directory to pull crash reports to.
        :param entry: File or Folder to pull.
        :param erase: Whether to erase the original file from the CrashReports directory.
        :param match: Regex to match against file and directory names to pull.
        :param progress_bar: Whether to show a progress bar when pulling large files.
        """

        async def _callback(src: str, dst: str) -> None:
            self.logger.info(f"{src} --> {dst}")
            if erase:
                await self.afc.rm_single(src, force=True)

        match = None if match is None else re.compile(match)
        await self.afc.pull(entry, out, match, callback=_callback, progress_bar=progress_bar, ignore_errors=True)

    async def flush(self) -> None:
        """
        Flush pending crash products into the CrashReports directory.

        Connects to the ``com.apple.crashreportmover`` service, which moves any pending reports into
        the CrashReports directory and acknowledges with a ``ping`` once done.
        """
        ack = b"ping\x00"
        service = await self.lockdown.start_lockdown_service(self.crash_mover_service_name)
        assert ack == await service.recvall(len(ack))

    async def watch(self, name: Optional[str] = None, raw: bool = False) -> AsyncGenerator[str, None]:
        """
        Continuously monitor the syslog and yield each newly created crash report.

        Watches the device syslog for ``osanalyticshelper`` "Saved type" messages, reads the
        referenced ``.ips`` or ``.panic`` file (retrying until it becomes readable) and yields it.

        :param name: If provided, only reports whose parsed process name equals this value are
            yielded; otherwise every new report is yielded.
        :param raw: When ``True``, yield the raw report text; when ``False``, yield the parsed
            crash report object.
        :returns: Async generator yielding either raw report strings or parsed crash report objects.
        """
        async for syslog_entry in OsTraceService(lockdown=self.lockdown).syslog():
            if (
                (posixpath.basename(syslog_entry.filename) != "osanalyticshelper")
                or (posixpath.basename(syslog_entry.image_name) != "OSAnalytics")
                or not syslog_entry.message.startswith("Saved type ")
            ):
                # skip non-ips creation syslog lines
                continue

            filename = posixpath.basename(syslog_entry.message.split()[-1])
            self.logger.debug(f"crash report: {filename}")

            if posixpath.splitext(filename)[-1] not in (".ips", ".panic"):
                continue

            while True:
                try:
                    crash_report_raw = (await self.afc.get_file_contents(filename)).decode()
                    crash_report = get_crash_report_from_buf(crash_report_raw, filename=filename)
                    break
                except (AfcFileNotFoundError, JSONDecodeError):
                    # Sometimes we have to wait for the file to be readable
                    pass

            if name is None or crash_report.name == name:
                if raw:
                    yield crash_report_raw
                else:
                    yield crash_report

    async def get_new_sysdiagnose(
        self,
        out: str,
        erase: bool = True,
        *,
        timeout: Optional[float] = None,
        callback: Optional[Callable[[float], None]] = None,
    ) -> None:
        """
        Monitor the creation of a newly created sysdiagnose archive and pull it
        :param out: filename
        :param erase: remove after pulling
        :keyword timeout: Maximum time in seconds to wait for the completion of sysdiagnose archive
            If None (default), waits indefinitely
        :keyword callback: optional callback function (form: func(float)) that accepts the elapsed time so far
        """
        start_time = time.monotonic()
        end_time = None
        if timeout is not None:
            end_time = start_time + timeout
        sysdiagnose_filename = await self._get_new_sysdiagnose_filename(end_time)

        if callback is not None:
            callback(time.monotonic() - start_time)

        self.logger.info("sysdiagnose tarball creation has been started")
        await self._wait_for_sysdiagnose_to_finish(timeout)

        if callback is not None:
            callback(time.monotonic() - start_time)

        await self.pull(out, entry=sysdiagnose_filename, erase=erase)

        if callback is not None:
            callback(time.monotonic() - start_time)

    async def _wait_for_sysdiagnose_to_finish(self, end_time: Optional[float] = None) -> None:
        async with NotificationProxyService(self.lockdown, timeout=end_time) as service:
            stop_notification = "com.apple.sysdiagnose.sysdiagnoseStopped"
            await service.notify_register_dispatch(stop_notification)
            try:
                async for event in service.receive_notification():
                    if event["Name"] != stop_notification:
                        continue
                    self.logger.debug(f"Received {event}")
                    await asyncio.sleep(IOS17_SYSDIAGNOSE_DELAY)
                    break
            except NotificationTimeoutError as e:
                raise SysdiagnoseTimeoutError("Timeout waiting for sysdiagnose completion") from e

    async def _get_new_sysdiagnose_filename(self, end_time: Optional[float] = None) -> str:
        sysdiagnose_filename = None
        excluded_temp_files = []

        while sysdiagnose_filename is None:
            try:
                for filename in await self.afc.listdir(SYSDIAGNOSE_DIR):
                    # search for an IN_PROGRESS archive
                    if filename not in excluded_temp_files and "IN_PROGRESS_" in filename:
                        for ext in self.IN_PROGRESS_SYSDIAGNOSE_EXTENSIONS:
                            if filename.endswith(ext):
                                delta = (
                                    await self.lockdown.get_date()
                                    - (await self.afc.stat(posixpath.join(SYSDIAGNOSE_DIR, filename)))["st_mtime"]
                                )
                                # Ignores IN_PROGRESS sysdiagnose files older than the defined time to live
                                if delta.total_seconds() < SYSDIAGNOSE_IN_PROGRESS_MAX_TTL_SECS:
                                    self.logger.debug(f"Detected in progress sysdiagnose {filename}")
                                    sysdiagnose_filename = filename.rsplit(ext)[0]
                                    sysdiagnose_filename = sysdiagnose_filename.replace("IN_PROGRESS_", "")
                                    sysdiagnose_filename = f"{sysdiagnose_filename}.tar.gz"
                                    return posixpath.join(SYSDIAGNOSE_DIR, sysdiagnose_filename)
                                else:
                                    self.logger.warning(f"Old sysdiagnose temp file ignored {filename}")
                                    excluded_temp_files.append(filename)
            except AfcException:
                pass

            if self._check_timeout(end_time):
                raise SysdiagnoseTimeoutError("Timeout finding in-progress sysdiagnose filename")
            await asyncio.sleep(0.1)

    def _check_timeout(self, end_time: Optional[float] = None) -> bool:
        return end_time is not None and time.monotonic() > end_time

aclose async

aclose() -> None

Close the underlying AFC connection.

Source code in pymobiledevice3/services/crash_reports.py
async def aclose(self) -> None:
    """Close the underlying AFC connection."""
    await self.afc.aclose()

clear async

clear(path: str = '/') -> None

Delete all crash reports under a path.

Lists the immediate children of path and removes each recursively. A com.apple.appstored entry that the device may recreate immediately after deletion is tolerated; any other item that could not be deleted is treated as an error.

Parameters:

Name Type Description Default
path str

Path under the crash reports directory to clear. Defaults to '/'.

'/'

Raises:

Type Description
AfcException

If any item other than com.apple.appstored could not be deleted.

Source code in pymobiledevice3/services/crash_reports.py
async def clear(self, path: str = "/") -> None:
    """
    Delete all crash reports under a path.

    Lists the immediate children of ``path`` and removes each recursively. A
    ``com.apple.appstored`` entry that the device may recreate immediately after deletion is
    tolerated; any other item that could not be deleted is treated as an error.

    :param path: Path under the crash reports directory to clear. Defaults to '/'.
    :raises AfcException: If any item other than ``com.apple.appstored`` could not be deleted.
    """
    undeleted_items = []
    for filename in await self.ls(path, depth=1):
        undeleted_items.extend(await self.afc.rm(filename, force=True))

    for item in undeleted_items:
        # special case of file that sometimes created automatically right after delete,
        # and then we can't delete the folder because it's not empty
        if item != self.APPSTORED_PATH:
            raise AfcException(
                f"failed to clear crash reports under {path!r}, undeleted items: {undeleted_items}", None
            )

ls async

ls(path: str = '/', depth: int = 1) -> list[str]

List file and folder in the crash report's directory.

Parameters:

Name Type Description Default
path str

Path to list, relative to the crash report's directory.

'/'
depth int

Listing depth, -1 to list infinite.

1

Returns:

Type Description
list[str]

List of files listed.

Source code in pymobiledevice3/services/crash_reports.py
async def ls(self, path: str = "/", depth: int = 1) -> list[str]:
    """
    List file and folder in the crash report's directory.
    :param path: Path to list, relative to the crash report's directory.
    :param depth: Listing depth, -1 to list infinite.
    :return: List of files listed.
    """
    result = []
    async for item in self.afc.dirlist(path, depth):
        result.append(item)
    return result[1:]  # skip the root path '/'

parse async

parse(path: str = '/') -> CrashReportBase

Parse a crash report file and return the parsed crash report object.

Parameters:

Name Type Description Default
path str

Path to a crash report file.

'/'

Returns:

Type Description
CrashReportBase

Parsed crash report object.

Source code in pymobiledevice3/services/crash_reports.py
async def parse(self, path: str = "/") -> CrashReportBase:
    """
    Parse a crash report file and return the parsed crash report object.

    :param path: Path to a crash report file.
    :return: Parsed crash report object.
    """
    return get_crash_report_from_buf((await self.afc.get_file_contents(path)).decode(), filename=path)

parse_latest async

parse_latest(path: str = '/', match: Optional[list[str]] = None, match_insensitive: Optional[list[str]] = None, count: int = 1) -> list[CrashReportBase]

Parse latest top-level crash report(s) under a path, optionally filtered by basename regex patterns.

Scans the top level of the given path, filters regular files by basename, and returns matches sorted by last modification time (newest first).

Parameters:

Name Type Description Default
path str

Path whose top-level entries should be considered. Defaults to '/'

'/'
match Optional[list[str]]

Case-sensitive regex patterns over report basename.

None
match_insensitive Optional[list[str]]

Case-insensitive regex patterns over report basename.

None
count int

Maximum number of latest matching reports to parse.

1

Returns:

Type Description
list[CrashReportBase]

Parsed crash report objects ordered from newest to oldest. Result length is between 1 and count.

Raises:

Type Description
ValueError

If count < 1 or if no reports match the filters.

All provided patterns must match.

Source code in pymobiledevice3/services/crash_reports.py
async def parse_latest(
    self,
    path: str = "/",
    match: Optional[list[str]] = None,
    match_insensitive: Optional[list[str]] = None,
    count: int = 1,
) -> list[CrashReportBase]:
    """
    Parse latest top-level crash report(s) under a path, optionally filtered by basename regex patterns.

    Scans the top level of the given path, filters regular files by basename,
    and returns matches sorted by last modification time (newest first).

    :param path: Path whose top-level entries should be considered. Defaults to '/'
    :param match: Case-sensitive regex patterns over report basename.
    :param match_insensitive: Case-insensitive regex patterns over report basename.
    :param count: Maximum number of latest matching reports to parse.
    :return: Parsed crash report objects ordered from newest to oldest.
             Result length is between 1 and count.
    :raises ValueError: If count < 1 or if no reports match the filters.

    All provided patterns must match.
    """

    def get_match_arguments_description():
        return ", ".join(
            f"{name}={value!r}"
            for name, value in (("match", match_patterns), ("match_insensitive", match_insensitive_patterns))
            if value
        )

    if count < 1:
        raise ValueError("count must be >= 1")

    match_patterns = match or []
    match_insensitive_patterns = match_insensitive or []

    patterns = [re.compile(pattern) for pattern in match_patterns]
    patterns.extend(re.compile(pattern, re.IGNORECASE) for pattern in match_insensitive_patterns)

    matching_entries_by_mtime = sorted(
        [
            (stat["st_mtime"].timestamp(), entry)
            async for entry in self.afc.dirlist(path, depth=1)
            if entry != path
            and (stat := await self.afc.stat(entry)).get("st_ifmt") == "S_IFREG"
            and (not patterns or all(pattern.search(posixpath.basename(entry)) for pattern in patterns))
        ],
        reverse=True,
    )

    if len(matching_entries_by_mtime) == 0:
        match_arguments_description = get_match_arguments_description()
        raise ValueError(
            f"No reports found ({match_arguments_description})"
            if match_arguments_description
            else "No reports found"
        )

    return [await self.parse(report_path) for _, report_path in matching_entries_by_mtime[:count]]

pull async

pull(out: str, entry: str = '/', erase: bool = False, match: Optional[str] = None, progress_bar: bool = True) -> None

Pull crash reports from the device.

Parameters:

Name Type Description Default
out str

Directory to pull crash reports to.

required
entry str

File or Folder to pull.

'/'
erase bool

Whether to erase the original file from the CrashReports directory.

False
match Optional[str]

Regex to match against file and directory names to pull.

None
progress_bar bool

Whether to show a progress bar when pulling large files.

True
Source code in pymobiledevice3/services/crash_reports.py
async def pull(
    self, out: str, entry: str = "/", erase: bool = False, match: Optional[str] = None, progress_bar: bool = True
) -> None:
    """
    Pull crash reports from the device.
    :param out: Directory to pull crash reports to.
    :param entry: File or Folder to pull.
    :param erase: Whether to erase the original file from the CrashReports directory.
    :param match: Regex to match against file and directory names to pull.
    :param progress_bar: Whether to show a progress bar when pulling large files.
    """

    async def _callback(src: str, dst: str) -> None:
        self.logger.info(f"{src} --> {dst}")
        if erase:
            await self.afc.rm_single(src, force=True)

    match = None if match is None else re.compile(match)
    await self.afc.pull(entry, out, match, callback=_callback, progress_bar=progress_bar, ignore_errors=True)

flush async

flush() -> None

Flush pending crash products into the CrashReports directory.

Connects to the com.apple.crashreportmover service, which moves any pending reports into the CrashReports directory and acknowledges with a ping once done.

Source code in pymobiledevice3/services/crash_reports.py
async def flush(self) -> None:
    """
    Flush pending crash products into the CrashReports directory.

    Connects to the ``com.apple.crashreportmover`` service, which moves any pending reports into
    the CrashReports directory and acknowledges with a ``ping`` once done.
    """
    ack = b"ping\x00"
    service = await self.lockdown.start_lockdown_service(self.crash_mover_service_name)
    assert ack == await service.recvall(len(ack))

watch async

watch(name: Optional[str] = None, raw: bool = False) -> AsyncGenerator[str, None]

Continuously monitor the syslog and yield each newly created crash report.

Watches the device syslog for osanalyticshelper "Saved type" messages, reads the referenced .ips or .panic file (retrying until it becomes readable) and yields it.

Parameters:

Name Type Description Default
name Optional[str]

If provided, only reports whose parsed process name equals this value are yielded; otherwise every new report is yielded.

None
raw bool

When True, yield the raw report text; when False, yield the parsed crash report object.

False

Returns:

Type Description
AsyncGenerator[str, None]

Async generator yielding either raw report strings or parsed crash report objects.

Source code in pymobiledevice3/services/crash_reports.py
async def watch(self, name: Optional[str] = None, raw: bool = False) -> AsyncGenerator[str, None]:
    """
    Continuously monitor the syslog and yield each newly created crash report.

    Watches the device syslog for ``osanalyticshelper`` "Saved type" messages, reads the
    referenced ``.ips`` or ``.panic`` file (retrying until it becomes readable) and yields it.

    :param name: If provided, only reports whose parsed process name equals this value are
        yielded; otherwise every new report is yielded.
    :param raw: When ``True``, yield the raw report text; when ``False``, yield the parsed
        crash report object.
    :returns: Async generator yielding either raw report strings or parsed crash report objects.
    """
    async for syslog_entry in OsTraceService(lockdown=self.lockdown).syslog():
        if (
            (posixpath.basename(syslog_entry.filename) != "osanalyticshelper")
            or (posixpath.basename(syslog_entry.image_name) != "OSAnalytics")
            or not syslog_entry.message.startswith("Saved type ")
        ):
            # skip non-ips creation syslog lines
            continue

        filename = posixpath.basename(syslog_entry.message.split()[-1])
        self.logger.debug(f"crash report: {filename}")

        if posixpath.splitext(filename)[-1] not in (".ips", ".panic"):
            continue

        while True:
            try:
                crash_report_raw = (await self.afc.get_file_contents(filename)).decode()
                crash_report = get_crash_report_from_buf(crash_report_raw, filename=filename)
                break
            except (AfcFileNotFoundError, JSONDecodeError):
                # Sometimes we have to wait for the file to be readable
                pass

        if name is None or crash_report.name == name:
            if raw:
                yield crash_report_raw
            else:
                yield crash_report

get_new_sysdiagnose async

get_new_sysdiagnose(out: str, erase: bool = True, *, timeout: Optional[float] = None, callback: Optional[Callable[[float], None]] = None) -> None

Monitor the creation of a newly created sysdiagnose archive and pull it

Parameters:

Name Type Description Default
out str

filename

required
erase bool

remove after pulling

True
timeout Optional[float]

Maximum time in seconds to wait for the completion of sysdiagnose archive If None (default), waits indefinitely

None
callback Optional[Callable[[float], None]]

optional callback function (form: func(float)) that accepts the elapsed time so far

None
Source code in pymobiledevice3/services/crash_reports.py
async def get_new_sysdiagnose(
    self,
    out: str,
    erase: bool = True,
    *,
    timeout: Optional[float] = None,
    callback: Optional[Callable[[float], None]] = None,
) -> None:
    """
    Monitor the creation of a newly created sysdiagnose archive and pull it
    :param out: filename
    :param erase: remove after pulling
    :keyword timeout: Maximum time in seconds to wait for the completion of sysdiagnose archive
        If None (default), waits indefinitely
    :keyword callback: optional callback function (form: func(float)) that accepts the elapsed time so far
    """
    start_time = time.monotonic()
    end_time = None
    if timeout is not None:
        end_time = start_time + timeout
    sysdiagnose_filename = await self._get_new_sysdiagnose_filename(end_time)

    if callback is not None:
        callback(time.monotonic() - start_time)

    self.logger.info("sysdiagnose tarball creation has been started")
    await self._wait_for_sysdiagnose_to_finish(timeout)

    if callback is not None:
        callback(time.monotonic() - start_time)

    await self.pull(out, entry=sysdiagnose_filename, erase=erase)

    if callback is not None:
        callback(time.monotonic() - start_time)

pymobiledevice3.services.mobile_image_mounter.MobileImageMounterService

Bases: LockdownService

Client for the com.apple.mobile.mobile_image_mounter lockdown service.

Provides the low-level operations for mounting and unmounting disk images (such as the Developer Disk Image) on a device: looking up mounted images, uploading image bytes, mounting and unmounting, and the personalization queries used for personalized images. Subclasses specialize the behavior for a specific image type via IMAGE_TYPE.

Implemented device-side in /usr/libexec/mobile_storage_proxy.

Source code in pymobiledevice3/services/mobile_image_mounter.py
class MobileImageMounterService(LockdownService):
    """
    Client for the ``com.apple.mobile.mobile_image_mounter`` lockdown service.

    Provides the low-level operations for mounting and unmounting disk images (such as the
    Developer Disk Image) on a device: looking up mounted images, uploading image bytes, mounting
    and unmounting, and the personalization queries used for personalized images. Subclasses
    specialize the behavior for a specific image type via `IMAGE_TYPE`.

    Implemented device-side in ``/usr/libexec/mobile_storage_proxy``.
    """

    # implemented in /usr/libexec/mobile_storage_proxy
    SERVICE_NAME = "com.apple.mobile.mobile_image_mounter"
    RSD_SERVICE_NAME = "com.apple.mobile.mobile_image_mounter.shim.remote"
    IMAGE_TYPE: Optional[str] = None

    def __init__(self, lockdown: LockdownServiceProvider):
        if isinstance(lockdown, LockdownClient):
            super().__init__(lockdown, self.SERVICE_NAME)
        else:
            super().__init__(lockdown, self.RSD_SERVICE_NAME)

    async def _send_recv(self, request: dict) -> dict:
        return await self.service.send_recv_plist(request)

    async def raise_if_cannot_mount(self) -> None:
        """
        Verify that an image of this mounter's `IMAGE_TYPE` can be mounted.

        :raises AlreadyMountedError: If an image of this type is already mounted.
        :raises DeveloperModeIsNotEnabledError: On iOS 16 and later when Developer Mode is disabled.
        """
        if await self.is_image_mounted(self.IMAGE_TYPE):
            raise AlreadyMountedError()
        if Version(self.lockdown.product_version).major >= 16 and not await self.lockdown.get_developer_mode_status():
            raise DeveloperModeIsNotEnabledError()

    async def copy_devices(self) -> list[dict]:
        """
        List the images currently mounted on the device.

        :returns: List of dictionaries, one per mounted image entry.
        :raises MessageNotSupportedError: If the device does not support the ``CopyDevices`` command.
        """
        try:
            return (await self._send_recv({"Command": "CopyDevices"}))["EntryList"]
        except KeyError as e:
            raise MessageNotSupportedError from e

    async def lookup_image(self, image_type: str) -> bytes:
        """
        Look up the signature of a mounted image by its type.

        :param image_type: Image type to look up (e.g. ``Developer`` or ``Personalized``).
        :returns: The image signature. If the device returns a list of signatures, the first is
            returned.
        :raises NotMountedError: If no image of this type is present.
        """
        response = await self._send_recv({"Command": "LookupImage", "ImageType": image_type})

        if not response or not response.get("ImagePresent", True):
            raise NotMountedError()

        signature = response.get("ImageSignature", [])
        if isinstance(signature, list):
            if not signature:
                raise NotMountedError()
            return signature[0]
        return signature

    async def is_image_mounted(self, image_type: str) -> bool:
        """
        Check whether an image of the given type is mounted.

        :param image_type: Image type to check (e.g. ``Developer`` or ``Personalized``).
        :returns: ``True`` if an image of this type is mounted, ``False`` otherwise.
        """
        try:
            await self.lookup_image(image_type)
        except NotMountedError:
            return False
        return True

    async def unmount_image(self, mount_path: str) -> None:
        """
        Unmount the image mounted at a given path (available since iOS 14.0).

        :param mount_path: Mount point of the image to unmount (e.g. ``/Developer``).
        :raises UnsupportedCommandError: If the device does not support the ``UnmountImage`` command.
        :raises NotMountedError: If no image is mounted at ``mount_path``.
        :raises InternalError: If the device reports an internal error.
        :raises PyMobileDevice3Exception: For any other error reported by the device.
        """
        request = {"Command": "UnmountImage", "MountPath": mount_path}
        response = await self._send_recv(request)

        error = response.get("Error")
        if error:
            if error == "UnknownCommand":
                raise UnsupportedCommandError()
            elif "There is no matching entry" in response.get("DetailedError", ""):
                raise NotMountedError(response)
            elif error == "InternalError":
                raise InternalError(response)
            else:
                raise PyMobileDevice3Exception(response)

    async def mount_image(self, image_type: str, signature: bytes, extras: Optional[dict] = None) -> None:
        """
        Mount an image that has already been uploaded to the device.

        :param image_type: Image type to mount (e.g. ``Developer`` or ``Personalized``).
        :param signature: Image signature (or, for personalized images, the IM4M manifest).
        :param extras: Optional additional fields merged into the ``MountImage`` request, such as
            ``ImageTrustCache`` and ``ImageInfoPlist`` for personalized images.
        :raises AlreadyMountedError: If an image of this type is already mounted.
        :raises DeveloperModeIsNotEnabledError: If the device reports Developer Mode is disabled.
        :raises PyMobileDevice3Exception: If the mount does not complete successfully.
        """

        if await self.is_image_mounted(image_type):
            raise AlreadyMountedError()

        request = {"Command": "MountImage", "ImageType": image_type, "ImageSignature": signature}

        if extras is not None:
            request.update(extras)
        response = await self._send_recv(request)

        if "Developer mode is not enabled" in response.get("DetailedError", ""):
            raise DeveloperModeIsNotEnabledError()

        status = response.get("Status")

        if status != "Complete":
            raise PyMobileDevice3Exception(f"command MountImage failed with: {response}")

    async def upload_image(self, image_type: str, image: bytes, signature: bytes) -> None:
        """
        Upload image bytes to the device in preparation for mounting.

        Issues a ``ReceiveBytes`` command, streams the image bytes once the device acknowledges, and
        waits for completion.

        :param image_type: Image type being uploaded (e.g. ``Developer`` or ``Personalized``).
        :param image: Raw image bytes to upload.
        :param signature: Image signature (or, for personalized images, the IM4M manifest).
        :raises PyMobileDevice3Exception: If the device does not acknowledge or does not complete
            the transfer.
        """
        result = await self.service.send_recv_plist({
            "Command": "ReceiveBytes",
            "ImageType": image_type,
            "ImageSize": len(image),
            "ImageSignature": signature,
        })

        status = result.get("Status")

        if status != "ReceiveBytesAck":
            raise PyMobileDevice3Exception(f"command ReceiveBytes failed with: {result}")

        await self.service.sendall(image)
        result = await self.service.recv_plist()

        status = result.get("Status")

        if status != "Complete":
            raise PyMobileDevice3Exception(f"command ReceiveBytes failed to send bytes with: {result}")

    async def query_developer_mode_status(self) -> bool:
        """
        Query whether Developer Mode is enabled on the device.

        :returns: ``True`` if Developer Mode is enabled, ``False`` otherwise.
        :raises MessageNotSupportedError: If the device does not support this command.
        """
        response = await self._send_recv({"Command": "QueryDeveloperModeStatus"})

        try:
            return response["DeveloperModeStatus"]
        except KeyError as e:
            raise MessageNotSupportedError from e

    async def query_nonce(self, personalized_image_type: Optional[str] = None) -> bytes:
        """
        Query the personalization nonce used for image personalization.

        :param personalized_image_type: Optional image type to scope the nonce to, sent as
            ``PersonalizedImageType``.
        :returns: The personalization nonce bytes.
        :raises MessageNotSupportedError: If the device does not support this command.
        """
        request = {"Command": "QueryNonce"}
        if personalized_image_type is not None:
            request["PersonalizedImageType"] = personalized_image_type
        response = await self._send_recv(request)
        try:
            return response["PersonalizationNonce"]
        except KeyError as e:
            raise MessageNotSupportedError from e

    async def query_personalization_identifiers(self, image_type: Optional[str] = None) -> dict:
        """
        Query the device identifiers required to personalize an image (board ID, chip ID, etc.).

        :param image_type: Optional image type to scope the query to, sent as
            ``PersonalizedImageType``.
        :returns: Mapping of personalization identifiers reported by the device.
        :raises MessageNotSupportedError: If the device does not support this command.
        """
        request = {"Command": "QueryPersonalizationIdentifiers"}

        if image_type is not None:
            request["PersonalizedImageType"] = image_type

        response = await self._send_recv(request)

        try:
            return response["PersonalizationIdentifiers"]
        except KeyError as e:
            raise MessageNotSupportedError from e

    async def query_personalization_manifest(self, image_type: str, signature: bytes) -> bytes:
        """
        Fetch a personalization manifest already stored on the device for an image.

        :param image_type: Image type whose manifest is requested.
        :param signature: Signature/digest of the image being queried.
        :returns: The personalization manifest, returned by the device under ``ImageSignature`` but
            actually an IM4M.
        :raises MissingManifestError: If the device has no manifest for the image.
        """
        response = await self._send_recv({
            "Command": "QueryPersonalizationManifest",
            "PersonalizedImageType": image_type,
            "ImageType": image_type,
            "ImageSignature": signature,
        })
        try:
            # The response "ImageSignature" is actually an IM4M
            return response["ImageSignature"]
        except KeyError as e:
            raise MissingManifestError() from e

    async def roll_personalization_nonce(self) -> None:
        """
        Request the device to roll (regenerate) its personalization nonce.

        The device may close the connection while handling this command; a resulting
        `ConnectionTerminatedError` is swallowed and treated as success.
        """
        try:
            await self._send_recv({"Command": "RollPersonalizationNonce"})
        except ConnectionTerminatedError:
            return

    async def roll_cryptex_nonce(self) -> None:
        """
        Request the device to roll (regenerate) its cryptex nonce.

        The device may close the connection while handling this command; a resulting
        `ConnectionTerminatedError` is swallowed and treated as success.
        """
        try:
            await self._send_recv({"Command": "RollCryptexNonce"})
        except ConnectionTerminatedError:
            return

raise_if_cannot_mount async

raise_if_cannot_mount() -> None

Verify that an image of this mounter's IMAGE_TYPE can be mounted.

Raises:

Type Description
AlreadyMountedError

If an image of this type is already mounted.

DeveloperModeIsNotEnabledError

On iOS 16 and later when Developer Mode is disabled.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def raise_if_cannot_mount(self) -> None:
    """
    Verify that an image of this mounter's `IMAGE_TYPE` can be mounted.

    :raises AlreadyMountedError: If an image of this type is already mounted.
    :raises DeveloperModeIsNotEnabledError: On iOS 16 and later when Developer Mode is disabled.
    """
    if await self.is_image_mounted(self.IMAGE_TYPE):
        raise AlreadyMountedError()
    if Version(self.lockdown.product_version).major >= 16 and not await self.lockdown.get_developer_mode_status():
        raise DeveloperModeIsNotEnabledError()

copy_devices async

copy_devices() -> list[dict]

List the images currently mounted on the device.

Returns:

Type Description
list[dict]

List of dictionaries, one per mounted image entry.

Raises:

Type Description
MessageNotSupportedError

If the device does not support the CopyDevices command.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def copy_devices(self) -> list[dict]:
    """
    List the images currently mounted on the device.

    :returns: List of dictionaries, one per mounted image entry.
    :raises MessageNotSupportedError: If the device does not support the ``CopyDevices`` command.
    """
    try:
        return (await self._send_recv({"Command": "CopyDevices"}))["EntryList"]
    except KeyError as e:
        raise MessageNotSupportedError from e

lookup_image async

lookup_image(image_type: str) -> bytes

Look up the signature of a mounted image by its type.

Parameters:

Name Type Description Default
image_type str

Image type to look up (e.g. Developer or Personalized).

required

Returns:

Type Description
bytes

The image signature. If the device returns a list of signatures, the first is returned.

Raises:

Type Description
NotMountedError

If no image of this type is present.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def lookup_image(self, image_type: str) -> bytes:
    """
    Look up the signature of a mounted image by its type.

    :param image_type: Image type to look up (e.g. ``Developer`` or ``Personalized``).
    :returns: The image signature. If the device returns a list of signatures, the first is
        returned.
    :raises NotMountedError: If no image of this type is present.
    """
    response = await self._send_recv({"Command": "LookupImage", "ImageType": image_type})

    if not response or not response.get("ImagePresent", True):
        raise NotMountedError()

    signature = response.get("ImageSignature", [])
    if isinstance(signature, list):
        if not signature:
            raise NotMountedError()
        return signature[0]
    return signature

is_image_mounted async

is_image_mounted(image_type: str) -> bool

Check whether an image of the given type is mounted.

Parameters:

Name Type Description Default
image_type str

Image type to check (e.g. Developer or Personalized).

required

Returns:

Type Description
bool

True if an image of this type is mounted, False otherwise.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def is_image_mounted(self, image_type: str) -> bool:
    """
    Check whether an image of the given type is mounted.

    :param image_type: Image type to check (e.g. ``Developer`` or ``Personalized``).
    :returns: ``True`` if an image of this type is mounted, ``False`` otherwise.
    """
    try:
        await self.lookup_image(image_type)
    except NotMountedError:
        return False
    return True

unmount_image async

unmount_image(mount_path: str) -> None

Unmount the image mounted at a given path (available since iOS 14.0).

Parameters:

Name Type Description Default
mount_path str

Mount point of the image to unmount (e.g. /Developer).

required

Raises:

Type Description
UnsupportedCommandError

If the device does not support the UnmountImage command.

NotMountedError

If no image is mounted at mount_path.

InternalError

If the device reports an internal error.

PyMobileDevice3Exception

For any other error reported by the device.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def unmount_image(self, mount_path: str) -> None:
    """
    Unmount the image mounted at a given path (available since iOS 14.0).

    :param mount_path: Mount point of the image to unmount (e.g. ``/Developer``).
    :raises UnsupportedCommandError: If the device does not support the ``UnmountImage`` command.
    :raises NotMountedError: If no image is mounted at ``mount_path``.
    :raises InternalError: If the device reports an internal error.
    :raises PyMobileDevice3Exception: For any other error reported by the device.
    """
    request = {"Command": "UnmountImage", "MountPath": mount_path}
    response = await self._send_recv(request)

    error = response.get("Error")
    if error:
        if error == "UnknownCommand":
            raise UnsupportedCommandError()
        elif "There is no matching entry" in response.get("DetailedError", ""):
            raise NotMountedError(response)
        elif error == "InternalError":
            raise InternalError(response)
        else:
            raise PyMobileDevice3Exception(response)

mount_image async

mount_image(image_type: str, signature: bytes, extras: Optional[dict] = None) -> None

Mount an image that has already been uploaded to the device.

Parameters:

Name Type Description Default
image_type str

Image type to mount (e.g. Developer or Personalized).

required
signature bytes

Image signature (or, for personalized images, the IM4M manifest).

required
extras Optional[dict]

Optional additional fields merged into the MountImage request, such as ImageTrustCache and ImageInfoPlist for personalized images.

None

Raises:

Type Description
AlreadyMountedError

If an image of this type is already mounted.

DeveloperModeIsNotEnabledError

If the device reports Developer Mode is disabled.

PyMobileDevice3Exception

If the mount does not complete successfully.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def mount_image(self, image_type: str, signature: bytes, extras: Optional[dict] = None) -> None:
    """
    Mount an image that has already been uploaded to the device.

    :param image_type: Image type to mount (e.g. ``Developer`` or ``Personalized``).
    :param signature: Image signature (or, for personalized images, the IM4M manifest).
    :param extras: Optional additional fields merged into the ``MountImage`` request, such as
        ``ImageTrustCache`` and ``ImageInfoPlist`` for personalized images.
    :raises AlreadyMountedError: If an image of this type is already mounted.
    :raises DeveloperModeIsNotEnabledError: If the device reports Developer Mode is disabled.
    :raises PyMobileDevice3Exception: If the mount does not complete successfully.
    """

    if await self.is_image_mounted(image_type):
        raise AlreadyMountedError()

    request = {"Command": "MountImage", "ImageType": image_type, "ImageSignature": signature}

    if extras is not None:
        request.update(extras)
    response = await self._send_recv(request)

    if "Developer mode is not enabled" in response.get("DetailedError", ""):
        raise DeveloperModeIsNotEnabledError()

    status = response.get("Status")

    if status != "Complete":
        raise PyMobileDevice3Exception(f"command MountImage failed with: {response}")

upload_image async

upload_image(image_type: str, image: bytes, signature: bytes) -> None

Upload image bytes to the device in preparation for mounting.

Issues a ReceiveBytes command, streams the image bytes once the device acknowledges, and waits for completion.

Parameters:

Name Type Description Default
image_type str

Image type being uploaded (e.g. Developer or Personalized).

required
image bytes

Raw image bytes to upload.

required
signature bytes

Image signature (or, for personalized images, the IM4M manifest).

required

Raises:

Type Description
PyMobileDevice3Exception

If the device does not acknowledge or does not complete the transfer.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def upload_image(self, image_type: str, image: bytes, signature: bytes) -> None:
    """
    Upload image bytes to the device in preparation for mounting.

    Issues a ``ReceiveBytes`` command, streams the image bytes once the device acknowledges, and
    waits for completion.

    :param image_type: Image type being uploaded (e.g. ``Developer`` or ``Personalized``).
    :param image: Raw image bytes to upload.
    :param signature: Image signature (or, for personalized images, the IM4M manifest).
    :raises PyMobileDevice3Exception: If the device does not acknowledge or does not complete
        the transfer.
    """
    result = await self.service.send_recv_plist({
        "Command": "ReceiveBytes",
        "ImageType": image_type,
        "ImageSize": len(image),
        "ImageSignature": signature,
    })

    status = result.get("Status")

    if status != "ReceiveBytesAck":
        raise PyMobileDevice3Exception(f"command ReceiveBytes failed with: {result}")

    await self.service.sendall(image)
    result = await self.service.recv_plist()

    status = result.get("Status")

    if status != "Complete":
        raise PyMobileDevice3Exception(f"command ReceiveBytes failed to send bytes with: {result}")

query_developer_mode_status async

query_developer_mode_status() -> bool

Query whether Developer Mode is enabled on the device.

Returns:

Type Description
bool

True if Developer Mode is enabled, False otherwise.

Raises:

Type Description
MessageNotSupportedError

If the device does not support this command.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def query_developer_mode_status(self) -> bool:
    """
    Query whether Developer Mode is enabled on the device.

    :returns: ``True`` if Developer Mode is enabled, ``False`` otherwise.
    :raises MessageNotSupportedError: If the device does not support this command.
    """
    response = await self._send_recv({"Command": "QueryDeveloperModeStatus"})

    try:
        return response["DeveloperModeStatus"]
    except KeyError as e:
        raise MessageNotSupportedError from e

query_nonce async

query_nonce(personalized_image_type: Optional[str] = None) -> bytes

Query the personalization nonce used for image personalization.

Parameters:

Name Type Description Default
personalized_image_type Optional[str]

Optional image type to scope the nonce to, sent as PersonalizedImageType.

None

Returns:

Type Description
bytes

The personalization nonce bytes.

Raises:

Type Description
MessageNotSupportedError

If the device does not support this command.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def query_nonce(self, personalized_image_type: Optional[str] = None) -> bytes:
    """
    Query the personalization nonce used for image personalization.

    :param personalized_image_type: Optional image type to scope the nonce to, sent as
        ``PersonalizedImageType``.
    :returns: The personalization nonce bytes.
    :raises MessageNotSupportedError: If the device does not support this command.
    """
    request = {"Command": "QueryNonce"}
    if personalized_image_type is not None:
        request["PersonalizedImageType"] = personalized_image_type
    response = await self._send_recv(request)
    try:
        return response["PersonalizationNonce"]
    except KeyError as e:
        raise MessageNotSupportedError from e

query_personalization_identifiers async

query_personalization_identifiers(image_type: Optional[str] = None) -> dict

Query the device identifiers required to personalize an image (board ID, chip ID, etc.).

Parameters:

Name Type Description Default
image_type Optional[str]

Optional image type to scope the query to, sent as PersonalizedImageType.

None

Returns:

Type Description
dict

Mapping of personalization identifiers reported by the device.

Raises:

Type Description
MessageNotSupportedError

If the device does not support this command.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def query_personalization_identifiers(self, image_type: Optional[str] = None) -> dict:
    """
    Query the device identifiers required to personalize an image (board ID, chip ID, etc.).

    :param image_type: Optional image type to scope the query to, sent as
        ``PersonalizedImageType``.
    :returns: Mapping of personalization identifiers reported by the device.
    :raises MessageNotSupportedError: If the device does not support this command.
    """
    request = {"Command": "QueryPersonalizationIdentifiers"}

    if image_type is not None:
        request["PersonalizedImageType"] = image_type

    response = await self._send_recv(request)

    try:
        return response["PersonalizationIdentifiers"]
    except KeyError as e:
        raise MessageNotSupportedError from e

query_personalization_manifest async

query_personalization_manifest(image_type: str, signature: bytes) -> bytes

Fetch a personalization manifest already stored on the device for an image.

Parameters:

Name Type Description Default
image_type str

Image type whose manifest is requested.

required
signature bytes

Signature/digest of the image being queried.

required

Returns:

Type Description
bytes

The personalization manifest, returned by the device under ImageSignature but actually an IM4M.

Raises:

Type Description
MissingManifestError

If the device has no manifest for the image.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def query_personalization_manifest(self, image_type: str, signature: bytes) -> bytes:
    """
    Fetch a personalization manifest already stored on the device for an image.

    :param image_type: Image type whose manifest is requested.
    :param signature: Signature/digest of the image being queried.
    :returns: The personalization manifest, returned by the device under ``ImageSignature`` but
        actually an IM4M.
    :raises MissingManifestError: If the device has no manifest for the image.
    """
    response = await self._send_recv({
        "Command": "QueryPersonalizationManifest",
        "PersonalizedImageType": image_type,
        "ImageType": image_type,
        "ImageSignature": signature,
    })
    try:
        # The response "ImageSignature" is actually an IM4M
        return response["ImageSignature"]
    except KeyError as e:
        raise MissingManifestError() from e

roll_personalization_nonce async

roll_personalization_nonce() -> None

Request the device to roll (regenerate) its personalization nonce.

The device may close the connection while handling this command; a resulting ConnectionTerminatedError is swallowed and treated as success.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def roll_personalization_nonce(self) -> None:
    """
    Request the device to roll (regenerate) its personalization nonce.

    The device may close the connection while handling this command; a resulting
    `ConnectionTerminatedError` is swallowed and treated as success.
    """
    try:
        await self._send_recv({"Command": "RollPersonalizationNonce"})
    except ConnectionTerminatedError:
        return

roll_cryptex_nonce async

roll_cryptex_nonce() -> None

Request the device to roll (regenerate) its cryptex nonce.

The device may close the connection while handling this command; a resulting ConnectionTerminatedError is swallowed and treated as success.

Source code in pymobiledevice3/services/mobile_image_mounter.py
async def roll_cryptex_nonce(self) -> None:
    """
    Request the device to roll (regenerate) its cryptex nonce.

    The device may close the connection while handling this command; a resulting
    `ConnectionTerminatedError` is swallowed and treated as success.
    """
    try:
        await self._send_recv({"Command": "RollCryptexNonce"})
    except ConnectionTerminatedError:
        return

pymobiledevice3.services.bt_packet_logger.BtPacketLoggerService

Bases: LockdownService

Source code in pymobiledevice3/services/bt_packet_logger.py
class BtPacketLoggerService(LockdownService):
    SERVICE_NAME = "com.apple.bluetooth.BTPacketLogger"

    def __init__(self, lockdown: LockdownServiceProvider) -> None:
        super().__init__(lockdown, self.SERVICE_NAME)

    async def watch(self, packets_count: int = -1) -> AsyncGenerator[bytes, None]:
        packet_index = 0
        while packet_index != packets_count:
            packet_length = unpack("<H", await self.service.recvall(SERVICE_PACKET_SIZE_HEADER))[0]
            if packet_length == 0:
                continue
            yield await self.service.recvall(packet_length)
            packet_index += 1

    async def write_to_packetlogger(self, out: BinaryIO, packet_generator: AsyncIterable[bytes]) -> None:
        await write_packetlogger_stream(out, packet_generator)

    async def write_to_pcapng(self, out: BinaryIO, packet_generator: AsyncIterable[bytes]) -> None:
        tz_offset_seconds = self.lockdown.all_values.get("TimeZoneOffsetFromUTC") or 0
        await write_pcapng_stream(out, packet_generator, self.lockdown.product_version, tz_offset_seconds)