Skip to content

Commit

Permalink
add framebuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
codeskyblue committed Apr 16, 2024
1 parent 488b13b commit cfa9d09
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 29 deletions.
10 changes: 7 additions & 3 deletions adbutils/_adb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from adbutils._proto import *
from adbutils._version import __version__

_OKAY = "OKAY"
_FAIL = "FAIL"
_OKAY = b"OKAY"
_FAIL = b"FAIL"


def _check_server(host: str, port: int) -> bool:
Expand Down Expand Up @@ -93,6 +93,10 @@ def read(self, n: int) -> bytes:
return self._read_fully(n)
except socket.timeout:
raise AdbTimeout("adb read timeout")

def read_uint32(self) -> int:
data = self.read(4)
return int.from_bytes(data, "little")

def _read_fully(self, n: int) -> bytes:
t = n
Expand Down Expand Up @@ -138,7 +142,7 @@ def read_until_close(self, encoding: str | None = "utf-8") -> Union[str, bytes]:
return content.decode(encoding, errors='replace') if encoding else content

def check_okay(self):
data = self.read_string(4)
data = self.read(4)
if data == _FAIL:
raise AdbError(self.read_string_block())
elif data == _OKAY:
Expand Down
47 changes: 47 additions & 0 deletions adbutils/_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import typing
from typing import Optional, Union

from PIL import Image

from adbutils._deprecated import DeprecatedExtension
from adbutils.install import InstallExtension
Expand Down Expand Up @@ -293,6 +294,51 @@ def reverse_list(self):
if len(parts) != 3:
continue
yield ReverseItem(*parts[1:])

def framebuffer(self) -> Image.Image:
"""Capture device screen and return PIL.Image object
Raises:
NotImplementedError
"""
# Ref: https://android.googlesource.com/platform/system/core/+/android-cts-7.0_r18/adb/framebuffer_service.cpp
# Ref: https://github.com/DeviceFarmer/adbkit/blob/c16081384ca34addbdab318bda3c76434b7538af/src/adb/command/host-transport/framebuffer.ts
c = self.open_transport()
c.send_command("framebuffer:")
c.check_okay()

version = c.read_uint32()
if version == 16:
raise NotImplementedError("Unsupported version 16")
bpp = c.read_uint32() # bits per pixel
if bpp != 24 and bpp != 32:
raise NotImplementedError("Unsupported bpp(bits per pixel)", bpp)
size = c.read_uint32()
if size == 1:
# FIXME: what is this?
size = c.read_uint32()
width = c.read_uint32()
height = c.read_uint32()
red_offset = c.read_uint32()
red_length = c.read_uint32() # always 8
blue_offset = c.read_uint32()
blue_length = c.read_uint32() # always 8
green_offset = c.read_uint32()
green_length = c.read_uint32() # always 8
alpha_offset = c.read_uint32()
alpha_length = c.read_uint32()

color_format = 'RGB'
if blue_offset == 0:
color_format = 'BGR'
if bpp == 32 or alpha_length:
color_format += 'A'

if color_format != 'RGBA' and color_format != 'RGB':
raise NotImplementedError("Unsupported color format")
buffer = c.read(size)
image = Image.frombytes(color_format, (width, height), buffer)
return image

def push(self, local: str, remote: str) -> str:
return self.adb_output("push", local, remote)
Expand Down Expand Up @@ -460,3 +506,4 @@ def __init__(
):
BaseDevice.__init__(self, client, serial, transport_id)
ScreenrecordExtension.__init__(self)
ScreenshotExtesion.__init__(self)
48 changes: 23 additions & 25 deletions adbutils/screenshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
"""

import abc
import os
import io
from typing import Optional, Union
from adbutils.sync import Sync
from adbutils._proto import WindowSize
from PIL import Image
import threading
import tempfile

try:
from PIL import UnidentifiedImageError
Expand All @@ -26,39 +25,38 @@ def sync(self) -> Sync:
pass

@abc.abstractmethod
def shell(self, cmd: str) -> str:
def shell(self, cmd: str, encoding: Optional[str]) -> Union[str, bytes]:
pass

@abc.abstractmethod
def window_size(self) -> WindowSize:
pass

@abc.abstractmethod
def framebuffer(self) -> Image.Image:
pass

class ScreenshotExtesion(AbstractDevice):
def __init__(self):
self.__framebuffer_ok = True

def screenshot(self) -> Image.Image:
""" not thread safe
Note:
screencap to file and pull is more stable then shell(stream=True)
Ref: https://github.com/openatx/adbutils/pull/78
""" Take a screenshot and return PIL.Image.Image object
"""
try:
return self.__screencap()
pil_image = self.__screencap()
if pil_image.mode == "RGBA":
pil_image = pil_image.convert("RGB")
return pil_image
except UnidentifiedImageError as e:
wsize = self.window_size()
return Image.new("RGB", wsize) # return a blank image when screenshot is not allowed


return Image.new("RGB", wsize, (220, 120, 100))

def __screencap(self) -> Image.Image:
thread_id = threading.get_native_id()
inner_tmp_path = f"/data/local/tmp/adbutils-tmp{thread_id}.png"
self.shell(["screencap", "-p", inner_tmp_path])
try:
with tempfile.TemporaryDirectory() as tmpdir:
target_path = os.path.join(tmpdir, "adbutils-tmp.png")
self.sync.pull(inner_tmp_path, target_path)
im = Image.open(target_path)
im.load()
return im.convert("RGB")
finally:
self.shell(['rm', inner_tmp_path])
if self.__framebuffer_ok:
try:
return self.framebuffer()
except NotImplementedError:
self.__framebuffer_ok = False
png_bytes = self.shell('screencap', encoding=None)
return Image.open(io.BytesIO(png_bytes))
7 changes: 6 additions & 1 deletion test_real_device/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ def test_sync_pull_file_push(device: AdbDevice, device_tmp_path, tmp_path: pathl
def test_screenshot(device: AdbDevice):
im = device.screenshot()
assert im.mode == "RGB"



def test_framebuffer(device: AdbDevice):
im = device.framebuffer()
assert im.size


def test_app_info(device: AdbDevice):
pinfo = device.app_current()
Expand Down

0 comments on commit cfa9d09

Please sign in to comment.