6165 lines
195 KiB
Python
6165 lines
195 KiB
Python
# PEDA - Python Exploit Development Assistance for GDB
|
|
#
|
|
# Copyright (C) 2012 Long Le Dinh <longld at vnsecurity.net>
|
|
#
|
|
# License: see LICENSE file for details
|
|
#
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import re
|
|
import os
|
|
import sys
|
|
import shlex
|
|
import string
|
|
import time
|
|
import signal
|
|
import traceback
|
|
import codecs
|
|
|
|
# point to absolute path of peda.py
|
|
PEDAFILE = os.path.abspath(os.path.expanduser(__file__))
|
|
if os.path.islink(PEDAFILE):
|
|
PEDAFILE = os.readlink(PEDAFILE)
|
|
sys.path.insert(0, os.path.dirname(PEDAFILE) + "/lib/")
|
|
|
|
# Use six library to provide Python 2/3 compatibility
|
|
import six
|
|
from six.moves import range
|
|
from six.moves import input
|
|
try:
|
|
import six.moves.cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
|
|
|
|
|
|
from skeleton import *
|
|
from shellcode import *
|
|
from utils import *
|
|
import config
|
|
from nasm import *
|
|
|
|
if sys.version_info.major == 3:
|
|
from urllib.request import urlopen
|
|
from urllib.parse import urlencode
|
|
pyversion = 3
|
|
else:
|
|
from urllib import urlopen
|
|
from urllib import urlencode
|
|
pyversion = 2
|
|
|
|
REGISTERS = {
|
|
8 : ["al", "ah", "bl", "bh", "cl", "ch", "dl", "dh"],
|
|
16: ["ax", "bx", "cx", "dx"],
|
|
32: ["eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "esp", "eip"],
|
|
64: ["rax", "rbx", "rcx", "rdx", "rsi", "rdi", "rbp", "rsp", "rip",
|
|
"r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15"]
|
|
}
|
|
|
|
###########################################################################
|
|
class PEDA(object):
|
|
"""
|
|
Class for actual functions of PEDA commands
|
|
"""
|
|
def __init__(self):
|
|
self.SAVED_COMMANDS = {} # saved GDB user's commands
|
|
|
|
|
|
####################################
|
|
# GDB Interaction / Misc Utils #
|
|
####################################
|
|
def execute(self, gdb_command):
|
|
"""
|
|
Wrapper for gdb.execute, catch the exception so it will not stop python script
|
|
|
|
Args:
|
|
- gdb_command (String)
|
|
|
|
Returns:
|
|
- True if execution succeed (Bool)
|
|
"""
|
|
try:
|
|
gdb.execute(gdb_command)
|
|
return True
|
|
except Exception as e:
|
|
if config.Option.get("debug") == "on":
|
|
msg('Exception (%s): %s' % (gdb_command, e), "red")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def execute_redirect(self, gdb_command, silent=False):
|
|
"""
|
|
Execute a gdb command and capture its output
|
|
|
|
Args:
|
|
- gdb_command (String)
|
|
- silent: discard command's output, redirect to /dev/null (Bool)
|
|
|
|
Returns:
|
|
- output of command (String)
|
|
"""
|
|
result = None
|
|
#init redirection
|
|
if silent:
|
|
logfd = open(os.path.devnull, "r+")
|
|
else:
|
|
logfd = tmpfile()
|
|
logname = logfd.name
|
|
gdb.execute('set logging off') # prevent nested call
|
|
gdb.execute('set height 0') # disable paging
|
|
gdb.execute('set logging file %s' % logname)
|
|
gdb.execute('set logging overwrite on')
|
|
gdb.execute('set logging redirect on')
|
|
gdb.execute('set logging on')
|
|
try:
|
|
gdb.execute(gdb_command)
|
|
gdb.flush()
|
|
gdb.execute('set logging off')
|
|
if not silent:
|
|
logfd.flush()
|
|
result = logfd.read()
|
|
logfd.close()
|
|
except Exception as e:
|
|
gdb.execute('set logging off') #to be sure
|
|
if config.Option.get("debug") == "on":
|
|
msg('Exception (%s): %s' % (gdb_command, e), "red")
|
|
traceback.print_exc()
|
|
logfd.close()
|
|
if config.Option.get("verbose") == "on":
|
|
msg(result)
|
|
return result
|
|
|
|
def parse_and_eval(self, exp):
|
|
"""
|
|
Work around implementation for gdb.parse_and_eval with enhancements
|
|
|
|
Args:
|
|
- exp: expression to evaluate (String)
|
|
|
|
Returns:
|
|
- value of expression
|
|
"""
|
|
|
|
regs = sum(REGISTERS.values(), [])
|
|
for r in regs:
|
|
if "$"+r not in exp and "e"+r not in exp and "r"+r not in exp:
|
|
exp = exp.replace(r, "$%s" % r)
|
|
|
|
p = re.compile("(.*)\[(.*)\]") # DWORD PTR [esi+eax*1]
|
|
matches = p.search(exp)
|
|
if not matches:
|
|
p = re.compile("(.*).s:(0x.*)") # DWORD PTR ds:0xdeadbeef
|
|
matches = p.search(exp)
|
|
|
|
if matches:
|
|
mod = "w"
|
|
if "BYTE" in matches.group(1):
|
|
mod = "b"
|
|
elif "QWORD" in matches.group(1):
|
|
mod = "g"
|
|
elif "DWORD" in matches.group(1):
|
|
mod = "w"
|
|
elif "WORD" in matches.group(1):
|
|
mod = "h"
|
|
|
|
out = self.execute_redirect("x/%sx %s" % (mod, matches.group(2)))
|
|
if not out:
|
|
return None
|
|
else:
|
|
return out.split(":\t")[-1].strip()
|
|
|
|
else:
|
|
out = self.execute_redirect("print %s" % exp)
|
|
if not out:
|
|
return None
|
|
else:
|
|
out = gdb.history(0).__str__()
|
|
out = out.encode('ascii', 'ignore')
|
|
out = decode_string_escape(out)
|
|
return out.strip()
|
|
|
|
def string_to_argv(self, str):
|
|
"""
|
|
Convert a string to argv list, pre-processing register and variable values
|
|
|
|
Args:
|
|
- str: input string (String)
|
|
|
|
Returns:
|
|
- argv list (List)
|
|
"""
|
|
try:
|
|
str = str.encode('ascii', 'ignore')
|
|
except:
|
|
pass
|
|
args = list(map(lambda x: decode_string_escape(x), shlex.split(str.decode())))
|
|
# need more processing here
|
|
for idx, a in enumerate(args):
|
|
a = a.strip(",")
|
|
if a.startswith("$"): # try to get register/variable value
|
|
v = self.parse_and_eval(a)
|
|
if v != None and v != "void":
|
|
if v.startswith("0x"): # int
|
|
args[idx] = v.split()[0] # workaround for 0xdeadbeef <symbol+x>
|
|
else: # string, complex data
|
|
args[idx] = v
|
|
elif a.startswith("+"): # relative value to prev arg
|
|
adder = to_int(self.parse_and_eval(a[1:]))
|
|
if adder is not None:
|
|
args[idx] = "%s" % to_hex(to_int(args[idx-1]) + adder)
|
|
elif is_math_exp(a):
|
|
try:
|
|
v = eval("%s" % a)
|
|
# XXX hack to avoid builtin functions/types
|
|
if not isinstance(v, six.string_types + six.integer_types):
|
|
continue
|
|
args[idx] = "%s" % (to_hex(v) if to_int(v) != None else v)
|
|
except:
|
|
pass
|
|
if config.Option.get("verbose") == "on":
|
|
msg(args)
|
|
return args
|
|
|
|
|
|
################################
|
|
# GDB User-Defined Helpers #
|
|
################################
|
|
def save_user_command(self, cmd):
|
|
"""
|
|
Save user-defined command and deactivate it
|
|
|
|
Args:
|
|
- cmd: user-defined command (String)
|
|
|
|
Returns:
|
|
- True if success to save (Bool)
|
|
"""
|
|
commands = self.execute_redirect("show user %s" % cmd)
|
|
if not commands:
|
|
return False
|
|
|
|
commands = "\n".join(commands.splitlines()[1:])
|
|
commands = "define %s\n" % cmd + commands + "end\n"
|
|
self.SAVED_COMMANDS[cmd] = commands
|
|
tmp = tmpfile()
|
|
tmp.write("define %s\nend\n" % cmd)
|
|
tmp.flush()
|
|
result = self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
return result
|
|
|
|
def define_user_command(self, cmd, code):
|
|
"""
|
|
Define a user-defined command, overwrite the old content
|
|
|
|
Args:
|
|
- cmd: user-defined command (String)
|
|
- code: gdb script code to append (String)
|
|
|
|
Returns:
|
|
- True if success to define (Bool)
|
|
"""
|
|
commands = "define %s\n" % cmd + code + "\nend\n"
|
|
tmp = tmpfile(is_binary_file=False)
|
|
tmp.write(commands)
|
|
tmp.flush()
|
|
result = self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
return result
|
|
|
|
def append_user_command(self, cmd, code):
|
|
"""
|
|
Append code to a user-defined command, define new command if not exist
|
|
|
|
Args:
|
|
- cmd: user-defined command (String)
|
|
- code: gdb script code to append (String)
|
|
|
|
Returns:
|
|
- True if success to append (Bool)
|
|
"""
|
|
|
|
commands = self.execute_redirect("show user %s" % cmd)
|
|
if not commands:
|
|
return self.define_user_command(cmd, code)
|
|
# else
|
|
commands = "\n".join(commands.splitlines()[1:])
|
|
if code in commands:
|
|
return True
|
|
|
|
commands = "define %s\n" % cmd + commands + code + "\nend\n"
|
|
tmp = tmpfile()
|
|
tmp.write(commands)
|
|
tmp.flush()
|
|
result = self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
return result
|
|
|
|
def restore_user_command(self, cmd):
|
|
"""
|
|
Restore saved user-defined command
|
|
|
|
Args:
|
|
- cmd: user-defined command (String)
|
|
|
|
Returns:
|
|
- True if success to restore (Bool)
|
|
"""
|
|
if cmd == "all":
|
|
commands = "\n".join(self.SAVED_COMMANDS.values())
|
|
self.SAVED_COMMANDS = {}
|
|
else:
|
|
if cmd not in self.SAVED_COMMANDS:
|
|
return False
|
|
else:
|
|
commands = self.SAVED_COMMANDS[cmd]
|
|
self.SAVED_COMMANDS.pop(cmd)
|
|
tmp = tmpfile()
|
|
tmp.write(commands)
|
|
tmp.flush()
|
|
result = self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
|
|
return result
|
|
|
|
def run_gdbscript_code(self, code):
|
|
"""
|
|
Run basic gdbscript code as it is typed in interactively
|
|
|
|
Args:
|
|
- code: gdbscript code, lines are splitted by "\n" or ";" (String)
|
|
|
|
Returns:
|
|
- True if success to run (Bool)
|
|
"""
|
|
tmp = tmpfile()
|
|
tmp.write(code.replace(";", "\n"))
|
|
tmp.flush()
|
|
result = self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
return result
|
|
|
|
#########################
|
|
# Debugging Helpers #
|
|
#########################
|
|
@memoized
|
|
def is_target_remote(self):
|
|
"""
|
|
Check if current target is remote
|
|
|
|
Returns:
|
|
- True if target is remote (Bool)
|
|
"""
|
|
out = self.execute_redirect("info program")
|
|
if out and "serial line" in out: # remote target
|
|
return True
|
|
|
|
return False
|
|
|
|
@memoized
|
|
def getfile(self):
|
|
"""
|
|
Get exec file of debugged program
|
|
|
|
Returns:
|
|
- full path to executable file (String)
|
|
"""
|
|
result = None
|
|
out = self.execute_redirect('info files')
|
|
if out and '"' in out:
|
|
p = re.compile(".*exec file:\s*`(.*)'")
|
|
m = p.search(out)
|
|
if m:
|
|
result = m.group(1)
|
|
else: # stripped file, get symbol file
|
|
p = re.compile("Symbols from \"([^\"]*)")
|
|
m = p.search(out)
|
|
if m:
|
|
result = m.group(1)
|
|
|
|
return result
|
|
|
|
def get_status(self):
|
|
"""
|
|
Get execution status of debugged program
|
|
|
|
Returns:
|
|
- current status of program (String)
|
|
STOPPED - not being run
|
|
BREAKPOINT - breakpoint hit
|
|
SIGXXX - stopped by signal XXX
|
|
UNKNOWN - unknown, not implemented
|
|
"""
|
|
status = "UNKNOWN"
|
|
out = self.execute_redirect("info program")
|
|
for line in out.splitlines():
|
|
if line.startswith("It stopped"):
|
|
if "signal" in line: # stopped by signal
|
|
status = line.split("signal")[1].split(",")[0].strip()
|
|
break
|
|
if "breakpoint" in line: # breakpoint hit
|
|
status = "BREAKPOINT"
|
|
break
|
|
if "not being run" in line:
|
|
status = "STOPPED"
|
|
break
|
|
return status
|
|
|
|
@memoized
|
|
def getpid(self):
|
|
"""
|
|
Get PID of the debugged process
|
|
|
|
Returns:
|
|
- pid (Int)
|
|
"""
|
|
|
|
out = None
|
|
status = self.get_status()
|
|
if not status or status == "STOPPED":
|
|
return None
|
|
pid = gdb.selected_inferior().pid
|
|
return int(pid) if pid else None
|
|
|
|
def getos(self):
|
|
"""
|
|
Get running OS info
|
|
|
|
Returns:
|
|
- os version (String)
|
|
"""
|
|
# TODO: get remote os by calling uname()
|
|
return os.uname()[0]
|
|
|
|
@memoized
|
|
def getarch(self):
|
|
"""
|
|
Get architecture of debugged program
|
|
|
|
Returns:
|
|
- tuple of architecture info (arch (String), bits (Int))
|
|
"""
|
|
arch = "unknown"
|
|
bits = 32
|
|
out = self.execute_redirect('maintenance info sections ?').splitlines()
|
|
for line in out:
|
|
if "file type" in line:
|
|
arch = line.split()[-1][:-1]
|
|
break
|
|
if "64" in arch:
|
|
bits = 64
|
|
return (arch, bits)
|
|
|
|
def intsize(self):
|
|
"""
|
|
Get dword size of debugged program
|
|
|
|
Returns:
|
|
- size (Int)
|
|
+ intsize = 4/8 for 32/64-bits arch
|
|
"""
|
|
|
|
(arch, bits) = self.getarch()
|
|
return bits // 8
|
|
|
|
def getregs(self, reglist=None):
|
|
"""
|
|
Get value of some or all registers
|
|
|
|
Returns:
|
|
- dictionary of {regname(String) : value(Int)}
|
|
"""
|
|
if reglist:
|
|
reglist = reglist.replace(",", " ")
|
|
else:
|
|
reglist = ""
|
|
regs = self.execute_redirect("info registers %s" % reglist)
|
|
if not regs:
|
|
return None
|
|
|
|
result = {}
|
|
if regs:
|
|
for r in regs.splitlines():
|
|
r = r.split()
|
|
if len(r) > 1 and to_int(r[1]) is not None:
|
|
result[r[0]] = to_int(r[1])
|
|
|
|
return result
|
|
|
|
def getreg(self, register):
|
|
"""
|
|
Get value of a specific register
|
|
|
|
Args:
|
|
- register: register name (String)
|
|
|
|
Returns:
|
|
- register value (Int)
|
|
"""
|
|
r = register.lower()
|
|
regs = self.execute_redirect("info registers %s" % r)
|
|
if regs:
|
|
regs = regs.splitlines()
|
|
if len(regs) > 1:
|
|
return None
|
|
else:
|
|
result = to_int(regs[0].split()[1])
|
|
return result
|
|
|
|
return None
|
|
|
|
def set_breakpoint(self, location, temp=0, hard=0):
|
|
"""
|
|
Wrapper for GDB break command
|
|
- location: target function or address (String ot Int)
|
|
|
|
Returns:
|
|
- True if can set breakpoint
|
|
"""
|
|
cmd = "break"
|
|
if hard:
|
|
cmd = "h" + cmd
|
|
if temp:
|
|
cmd = "t" + cmd
|
|
|
|
if to_int(location) is not None:
|
|
return peda.execute("%s *0x%x" % (cmd, to_int(location)))
|
|
else:
|
|
return peda.execute("%s %s" % (cmd, location))
|
|
|
|
def get_breakpoint(self, num):
|
|
"""
|
|
Get info of a specific breakpoint
|
|
TODO: support catchpoint, watchpoint
|
|
|
|
Args:
|
|
- num: breakpoint number
|
|
|
|
Returns:
|
|
- tuple (Num(Int), Type(String), Disp(Bool), Enb(Bool), Address(Int), What(String), commands(String))
|
|
"""
|
|
out = self.execute_redirect("info breakpoints %d" % num)
|
|
if not out or "No breakpoint" in out:
|
|
return None
|
|
|
|
lines = out.splitlines()[1:]
|
|
# breakpoint regex
|
|
p = re.compile("^(\d*)\s*(.*breakpoint)\s*(keep|del)\s*(y|n)\s*(0x[^ ]*)\s*(.*)")
|
|
m = p.match(lines[0])
|
|
if not m:
|
|
# catchpoint/watchpoint regex
|
|
p = re.compile("^(\d*)\s*(.*point)\s*(keep|del)\s*(y|n)\s*(.*)")
|
|
m = p.match(lines[0])
|
|
if not m:
|
|
return None
|
|
else:
|
|
(num, type, disp, enb, what) = m.groups()
|
|
addr = ''
|
|
else:
|
|
(num, type, disp, enb, addr, what) = m.groups()
|
|
|
|
disp = True if disp == "keep" else False
|
|
enb = True if enb == "y" else False
|
|
addr = to_int(addr)
|
|
m = re.match("in.*at(.*:\d*)", what)
|
|
if m:
|
|
what = m.group(1)
|
|
else:
|
|
if addr: # breakpoint
|
|
what = ""
|
|
|
|
commands = ""
|
|
if len(lines) > 1:
|
|
for line in lines[1:]:
|
|
if "already hit" in line: continue
|
|
commands += line + "\n"
|
|
|
|
return (num, type, disp, enb, addr, what, commands.rstrip())
|
|
|
|
def get_breakpoints(self):
|
|
"""
|
|
Get list of current breakpoints
|
|
|
|
Returns:
|
|
- list of tuple (Num(Int), Type(String), Disp(Bool), Nnb(Bool), Address(Int), commands(String))
|
|
"""
|
|
result = []
|
|
out = self.execute_redirect("info breakpoints")
|
|
if not out:
|
|
return []
|
|
|
|
bplist = []
|
|
for line in out.splitlines():
|
|
m = re.match("^(\d*).*", line)
|
|
if m and to_int(m.group(1)):
|
|
bplist += [to_int(m.group(1))]
|
|
|
|
for num in bplist:
|
|
r = self.get_breakpoint(num)
|
|
if r:
|
|
result += [r]
|
|
return result
|
|
|
|
def save_breakpoints(self, filename):
|
|
"""
|
|
Save current breakpoints to file as a script
|
|
|
|
Args:
|
|
- filename: target file (String)
|
|
|
|
Returns:
|
|
- True if success to save (Bool)
|
|
"""
|
|
# use built-in command for gdb 7.2+
|
|
result = self.execute_redirect("save breakpoints %s" % filename)
|
|
if result == '':
|
|
return True
|
|
|
|
bplist = self.get_breakpoints()
|
|
if not bplist:
|
|
return False
|
|
|
|
try:
|
|
fd = open(filename, "w")
|
|
for (num, type, disp, enb, addr, what, commands) in bplist:
|
|
m = re.match("(.*)point", type)
|
|
if m:
|
|
cmd = m.group(1).split()[-1]
|
|
else:
|
|
cmd = "break"
|
|
if "hw" in type and cmd == "break":
|
|
cmd = "h" + cmd
|
|
if "read" in type:
|
|
cmd = "r" + cmd
|
|
if "acc" in type:
|
|
cmd = "a" + cmd
|
|
|
|
if not disp:
|
|
cmd = "t" + cmd
|
|
if what:
|
|
location = what
|
|
else:
|
|
location = "*0x%x" % addr
|
|
text = "%s %s" % (cmd, location)
|
|
if commands:
|
|
if "stop only" not in commands:
|
|
text += "\ncommands\n%s\nend" % commands
|
|
else:
|
|
text += commands.split("stop only", 1)[1]
|
|
fd.write(text + "\n")
|
|
fd.close()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def get_config_filename(self, name):
|
|
filename = peda.getfile()
|
|
if not filename:
|
|
filename = peda.getpid()
|
|
if not filename:
|
|
filename = 'unknown'
|
|
|
|
filename = os.path.basename("%s" % filename)
|
|
tmpl_name = config.Option.get(name)
|
|
if tmpl_name:
|
|
return tmpl_name.replace("#FILENAME#", filename)
|
|
else:
|
|
return "peda-%s-%s" % (name, filename)
|
|
|
|
def save_session(self, filename=None):
|
|
"""
|
|
Save current working gdb session to file as a script
|
|
|
|
Args:
|
|
- filename: target file (String)
|
|
|
|
Returns:
|
|
- True if success to save (Bool)
|
|
"""
|
|
session = ""
|
|
if not filename:
|
|
filename = self.get_config_filename("session")
|
|
|
|
# exec-wrapper
|
|
out = self.execute_redirect("show exec-wrapper")
|
|
wrapper = out.split('"')[1]
|
|
if wrapper:
|
|
session += "set exec-wrapper %s\n" % wrapper
|
|
|
|
try:
|
|
# save breakpoints
|
|
self.save_breakpoints(filename)
|
|
fd = open(filename, "a+")
|
|
fd.write("\n" + session)
|
|
fd.close()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def restore_session(self, filename=None):
|
|
"""
|
|
Restore previous saved working gdb session from file
|
|
|
|
Args:
|
|
- filename: source file (String)
|
|
|
|
Returns:
|
|
- True if success to restore (Bool)
|
|
"""
|
|
if not filename:
|
|
filename = self.get_config_filename("session")
|
|
|
|
# temporarily save and clear breakpoints
|
|
tmp = tmpfile()
|
|
self.save_breakpoints(tmp.name)
|
|
self.execute("delete")
|
|
result = self.execute("source %s" % filename)
|
|
if not result:
|
|
self.execute("source %s" % tmp.name)
|
|
tmp.close()
|
|
return result
|
|
|
|
@memoized
|
|
def assemble(self, asmcode, bits=None):
|
|
"""
|
|
Assemble ASM instructions using NASM
|
|
- asmcode: input ASM instructions, multiple instructions are separated by ";" (String)
|
|
|
|
Returns:
|
|
- bin code (raw bytes)
|
|
"""
|
|
if bits is None:
|
|
(arch, bits) = self.getarch()
|
|
return Nasm.assemble(asmcode, bits)
|
|
|
|
def disassemble(self, *arg):
|
|
"""
|
|
Wrapper for disassemble command
|
|
- arg: args for disassemble command
|
|
|
|
Returns:
|
|
- text code (String)
|
|
"""
|
|
code = ""
|
|
modif = ""
|
|
arg = list(arg)
|
|
if len(arg) > 1:
|
|
if "/" in arg[0]:
|
|
modif = arg[0]
|
|
arg = arg[1:]
|
|
if len(arg) == 1 and to_int(arg[0]) != None:
|
|
arg += [to_hex(to_int(arg[0]) + 32)]
|
|
|
|
self.execute("set disassembly-flavor intel")
|
|
out = self.execute_redirect("disassemble %s %s" % (modif, ",".join(arg)))
|
|
if not out:
|
|
return None
|
|
else:
|
|
code = out
|
|
|
|
return code
|
|
|
|
@memoized
|
|
def prev_inst(self, address, count=1):
|
|
"""
|
|
Get previous instructions at an address
|
|
|
|
Args:
|
|
- address: address to get previous instruction (Int)
|
|
- count: number of instructions to read (Int)
|
|
|
|
Returns:
|
|
- list of tuple (address(Int), code(String))
|
|
"""
|
|
result = []
|
|
backward = 64+16*count
|
|
for i in range(backward):
|
|
if self.getpid() and not self.is_address(address-backward+i):
|
|
continue
|
|
|
|
code = self.execute_redirect("disassemble %s, %s" % (to_hex(address-backward+i), to_hex(address+1)))
|
|
if code and ("%x" % address) in code:
|
|
lines = code.strip().splitlines()[1:-1]
|
|
if len(lines) > count and "(bad)" not in " ".join(lines):
|
|
for line in lines[-count-1:-1]:
|
|
(addr, code) = line.split(":", 1)
|
|
addr = re.search("(0x[^ ]*)", addr).group(1)
|
|
result += [(to_int(addr), code)]
|
|
return result
|
|
return None
|
|
|
|
@memoized
|
|
def current_inst(self, address):
|
|
"""
|
|
Parse instruction at an address
|
|
|
|
Args:
|
|
- address: address to get next instruction (Int)
|
|
|
|
Returns:
|
|
- tuple of (address(Int), code(String))
|
|
"""
|
|
out = self.execute_redirect("x/i 0x%x" % address)
|
|
if not out:
|
|
return None
|
|
|
|
(addr, code) = out.split(":", 1)
|
|
addr = re.search("(0x[^ ]*)", addr).group(1)
|
|
addr = to_int(addr)
|
|
code = code.strip()
|
|
|
|
return (addr, code)
|
|
|
|
@memoized
|
|
def next_inst(self, address, count=1):
|
|
"""
|
|
Get next instructions at an address
|
|
|
|
Args:
|
|
- address: address to get next instruction (Int)
|
|
- count: number of instructions to read (Int)
|
|
|
|
Returns:
|
|
- - list of tuple (address(Int), code(String))
|
|
"""
|
|
result = []
|
|
code = self.execute_redirect("x/%di 0x%x" % (count+1, address))
|
|
if not code:
|
|
return None
|
|
|
|
lines = code.strip().splitlines()
|
|
for i in range(1, count+1):
|
|
(addr, code) = lines[i].split(":", 1)
|
|
addr = re.search("(0x[^ ]*)", addr).group(1)
|
|
result += [(to_int(addr), code)]
|
|
return result
|
|
|
|
@memoized
|
|
def disassemble_around(self, address, count=8):
|
|
"""
|
|
Disassemble instructions nearby current PC or an address
|
|
|
|
Args:
|
|
- address: start address to disassemble around (Int)
|
|
- count: number of instructions to disassemble
|
|
|
|
Returns:
|
|
- text code (String)
|
|
"""
|
|
count = min(count, 256)
|
|
pc = address
|
|
if pc is None:
|
|
return None
|
|
|
|
# check if address is reachable
|
|
if not self.execute_redirect("x/x 0x%x" % pc):
|
|
return None
|
|
|
|
prev_code = self.prev_inst(pc, count//2-1)
|
|
if prev_code:
|
|
start = prev_code[0][0]
|
|
else:
|
|
start = pc
|
|
if start == pc:
|
|
count = count//2
|
|
|
|
code = self.execute_redirect("x/%di 0x%x" % (count, start))
|
|
if "0x%x" % pc not in code:
|
|
code = self.execute_redirect("x/%di 0x%x" % (count//2, pc))
|
|
|
|
return code.rstrip()
|
|
|
|
@memoized
|
|
def xrefs(self, search="", filename=None):
|
|
"""
|
|
Search for all call references or data access to a function/variable
|
|
|
|
Args:
|
|
- search: function or variable to search for (String)
|
|
- filename: binary/library to search (String)
|
|
|
|
Returns:
|
|
- list of tuple (address(Int), asm instruction(String))
|
|
"""
|
|
result = []
|
|
if not filename:
|
|
filename = self.getfile()
|
|
|
|
if not filename:
|
|
return None
|
|
vmap = self.get_vmmap(filename)
|
|
elfbase = vmap[0][0] if vmap else 0
|
|
|
|
if to_int(search) is not None:
|
|
search = "%x" % to_int(search)
|
|
|
|
search_data = 1
|
|
if search == "":
|
|
search_data = 0
|
|
|
|
out = execute_external_command("%s -M intel -z --prefix-address -d '%s' | grep '%s'" % (config.OBJDUMP, filename, search))
|
|
|
|
for line in out.splitlines():
|
|
if not line: continue
|
|
addr = to_int("0x" + line.split()[0].strip())
|
|
if not addr: continue
|
|
|
|
# update with runtime values
|
|
if addr < elfbase:
|
|
addr += elfbase
|
|
out = self.execute_redirect("x/i 0x%x" % addr)
|
|
if out:
|
|
line = out
|
|
p = re.compile("\s*(0x[^ ]*).*?:\s*([^ ]*)\s*(.*)")
|
|
else:
|
|
p = re.compile("(.*?)\s*<.*?>\s*([^ ]*)\s*(.*)")
|
|
|
|
m = p.search(line)
|
|
if m:
|
|
(address, opcode, opers) = m.groups()
|
|
if "call" in opcode and search in opers:
|
|
result += [(addr, line.strip())]
|
|
if search_data:
|
|
if "mov" in opcode and search in opers:
|
|
result += [(addr, line.strip())]
|
|
|
|
return result
|
|
|
|
def _get_function_args_32(self, code, argc=None):
|
|
"""
|
|
Guess the number of arguments passed to a function - i386
|
|
"""
|
|
if not argc:
|
|
argc = 0
|
|
p = re.compile(".*mov.*\[esp(.*)\],")
|
|
matches = p.findall(code)
|
|
if matches:
|
|
l = len(matches)
|
|
for v in matches:
|
|
if v.startswith("+"):
|
|
offset = to_int(v[1:])
|
|
if offset is not None and (offset//4) > l:
|
|
continue
|
|
argc += 1
|
|
else: # try with push style
|
|
argc = code.count("push")
|
|
|
|
argc = min(argc, 6)
|
|
if argc == 0:
|
|
return []
|
|
|
|
args = []
|
|
sp = self.getreg("sp")
|
|
mem = self.dumpmem(sp, sp+4*argc)
|
|
for i in range(argc):
|
|
args += [struct.unpack("<L", mem[i*4:(i+1)*4])[0]]
|
|
|
|
return args
|
|
|
|
def _get_function_args_64(self, code, argc=None):
|
|
"""
|
|
Guess the number of arguments passed to a function - x86_64
|
|
"""
|
|
|
|
# just retrieve max 6 args
|
|
arg_order = ["rdi", "rsi", "rdx", "rcx", "r8", "r9"]
|
|
p = re.compile(":\s*([^ ]*)\s*(.*),")
|
|
matches = p.findall(code)
|
|
regs = [r for (_, r) in matches]
|
|
p = re.compile(("di|si|dx|cx|r8|r9"))
|
|
m = p.findall(" ".join(regs))
|
|
m = list(set(m)) # uniqify
|
|
argc = 0
|
|
if "si" in m and "di" not in m: # dirty fix
|
|
argc += 1
|
|
argc += m.count("di")
|
|
if argc > 0:
|
|
argc += m.count("si")
|
|
if argc > 1:
|
|
argc += m.count("dx")
|
|
if argc > 2:
|
|
argc += m.count("cx")
|
|
if argc > 3:
|
|
argc += m.count("r8")
|
|
if argc > 4:
|
|
argc += m.count("r9")
|
|
|
|
if argc == 0:
|
|
return []
|
|
|
|
args = []
|
|
regs = self.getregs()
|
|
for i in range(argc):
|
|
args += [regs[arg_order[i]]]
|
|
|
|
return args
|
|
|
|
def get_function_args(self, argc=None):
|
|
"""
|
|
Get the guessed arguments passed to a function when stopped at a call instruction
|
|
|
|
Args:
|
|
- argc: force to get specific number of arguments (Int)
|
|
|
|
Returns:
|
|
- list of arguments (List)
|
|
"""
|
|
|
|
args = []
|
|
regs = self.getregs()
|
|
if regs is None:
|
|
return []
|
|
|
|
(arch, bits) = self.getarch()
|
|
pc = self.getreg("pc")
|
|
prev_insts = self.prev_inst(pc, 12)
|
|
|
|
code = ""
|
|
if not prev_insts:
|
|
return []
|
|
|
|
for (addr, inst) in prev_insts[::-1]:
|
|
if "call" in inst.strip().split()[0]:
|
|
break
|
|
code = "0x%x:%s\n" % (addr, inst) + code
|
|
|
|
if "i386" in arch:
|
|
args = self._get_function_args_32(code, argc)
|
|
if "64" in arch:
|
|
args = self._get_function_args_64(code, argc)
|
|
|
|
return args
|
|
|
|
@memoized
|
|
def backtrace_depth(self, sp=None):
|
|
"""
|
|
Get number of frames in backtrace
|
|
|
|
Args:
|
|
- sp: stack pointer address, for caching (Int)
|
|
|
|
Returns:
|
|
- depth: number of frames (Int)
|
|
"""
|
|
backtrace = self.execute_redirect("backtrace")
|
|
return backtrace.count("#")
|
|
|
|
def stepuntil(self, inst, mapname=None, depth=None):
|
|
"""
|
|
Step execution until next "inst" instruction within a specific memory range
|
|
|
|
Args:
|
|
- inst: the instruction to reach (String)
|
|
- mapname: name of virtual memory region to check for the instruction (String)
|
|
- depth: backtrace depth (Int)
|
|
|
|
Returns:
|
|
- tuple of (depth, instruction)
|
|
+ depth: current backtrace depth (Int)
|
|
+ instruction: current instruction (String)
|
|
"""
|
|
|
|
if not self.getpid():
|
|
return None
|
|
|
|
maxdepth = to_int(config.Option.get("tracedepth"))
|
|
if not maxdepth:
|
|
maxdepth = 0xffffffff
|
|
|
|
maps = self.get_vmmap()
|
|
binname = self.getfile()
|
|
if mapname is None:
|
|
mapname = binname
|
|
mapname = mapname.replace(" ", "").split(",") + [binname]
|
|
targetmap = []
|
|
for m in mapname:
|
|
targetmap += self.get_vmmap(m)
|
|
binmap = self.get_vmmap("binary")
|
|
|
|
current_instruction = ""
|
|
pc = self.getreg("pc")
|
|
|
|
if depth is None:
|
|
current_depth = self.backtrace_depth(self.getreg("sp"))
|
|
else:
|
|
current_depth = depth
|
|
old_status = self.get_status()
|
|
|
|
while True:
|
|
status = self.get_status()
|
|
if status != old_status:
|
|
if "SIG" in status and status[3:] not in ["TRAP"] and not to_int(status[3:]): # ignore TRAP and numbered signals
|
|
current_instruction = "Interrupted: %s" % status
|
|
call_depth = current_depth
|
|
break
|
|
if "STOP" in status:
|
|
current_instruction = "End of execution"
|
|
call_depth = current_depth
|
|
break
|
|
|
|
call_depth = self.backtrace_depth(self.getreg("sp"))
|
|
current_instruction = self.execute_redirect("x/i $pc")
|
|
if not current_instruction:
|
|
current_instruction = "End of execution"
|
|
break
|
|
|
|
p = re.compile(".*?(0x[^ :]*)")
|
|
addr = p.search(current_instruction).group(1)
|
|
addr = to_int(addr)
|
|
if addr is None:
|
|
break
|
|
|
|
#p = re.compile(".*?:\s*([^ ]*)")
|
|
p = re.compile(".*?:\s*(.*)")
|
|
code = p.match(current_instruction).group(1)
|
|
found = 0
|
|
for i in inst.replace(",", " ").split():
|
|
if re.match(i.strip(), code.strip()):
|
|
if self.is_address(addr, targetmap) and addr != pc:
|
|
found = 1
|
|
break
|
|
if found != 0:
|
|
break
|
|
self.execute_redirect("stepi", silent=True)
|
|
if not self.is_address(addr, targetmap) or call_depth > maxdepth:
|
|
self.execute_redirect("finish", silent=True)
|
|
pc = 0
|
|
|
|
return (call_depth - current_depth, current_instruction.strip())
|
|
|
|
def get_eflags(self):
|
|
"""
|
|
Get flags value from EFLAGS register
|
|
|
|
Returns:
|
|
- dictionary of named flags
|
|
"""
|
|
|
|
# Eflags bit masks, source vdb
|
|
EFLAGS_CF = 1 << 0
|
|
EFLAGS_PF = 1 << 2
|
|
EFLAGS_AF = 1 << 4
|
|
EFLAGS_ZF = 1 << 6
|
|
EFLAGS_SF = 1 << 7
|
|
EFLAGS_TF = 1 << 8
|
|
EFLAGS_IF = 1 << 9
|
|
EFLAGS_DF = 1 << 10
|
|
EFLAGS_OF = 1 << 11
|
|
|
|
flags = {"CF":0, "PF":0, "AF":0, "ZF":0, "SF":0, "TF":0, "IF":0, "DF":0, "OF":0}
|
|
eflags = self.getreg("eflags")
|
|
if not eflags:
|
|
return None
|
|
flags["CF"] = bool(eflags & EFLAGS_CF)
|
|
flags["PF"] = bool(eflags & EFLAGS_PF)
|
|
flags["AF"] = bool(eflags & EFLAGS_AF)
|
|
flags["ZF"] = bool(eflags & EFLAGS_ZF)
|
|
flags["SF"] = bool(eflags & EFLAGS_SF)
|
|
flags["TF"] = bool(eflags & EFLAGS_TF)
|
|
flags["IF"] = bool(eflags & EFLAGS_IF)
|
|
flags["DF"] = bool(eflags & EFLAGS_DF)
|
|
flags["OF"] = bool(eflags & EFLAGS_OF)
|
|
|
|
return flags
|
|
|
|
def set_eflags(self, flagname, value):
|
|
"""
|
|
Set/clear/toggle value of a flag register
|
|
|
|
Returns:
|
|
- True if success (Bool)
|
|
"""
|
|
|
|
# Eflags bit masks, source vdb
|
|
EFLAGS_CF = 1 << 0
|
|
EFLAGS_PF = 1 << 2
|
|
EFLAGS_AF = 1 << 4
|
|
EFLAGS_ZF = 1 << 6
|
|
EFLAGS_SF = 1 << 7
|
|
EFLAGS_TF = 1 << 8
|
|
EFLAGS_IF = 1 << 9
|
|
EFLAGS_DF = 1 << 10
|
|
EFLAGS_OF = 1 << 11
|
|
|
|
flags = {"carry": "CF", "parity": "PF", "adjust": "AF", "zero": "ZF", "sign": "SF",
|
|
"trap": "TF", "interrupt": "IF", "direction": "DF", "overflow": "OF"}
|
|
|
|
flagname = flagname.lower()
|
|
|
|
if flagname not in flags:
|
|
return False
|
|
|
|
eflags = self.get_eflags()
|
|
if not eflags:
|
|
return False
|
|
|
|
# If value doesn't match the current, or we want to toggle, toggle
|
|
if value is None or eflags[flags[flagname]] != value:
|
|
reg_eflags = self.getreg("eflags")
|
|
reg_eflags ^= eval("EFLAGS_%s" % flags[flagname])
|
|
result = self.execute("set $eflags = 0x%x" % reg_eflags)
|
|
return result
|
|
|
|
return True
|
|
|
|
def eval_target(self, inst):
|
|
"""
|
|
Evaluate target address of an instruction, used for jumpto decision
|
|
|
|
Args:
|
|
- inst: ASM instruction text (String)
|
|
|
|
Returns:
|
|
- target address (Int)
|
|
"""
|
|
|
|
target = None
|
|
inst = inst.strip()
|
|
opcode = inst.split(":\t")[-1].split()[0]
|
|
# this regex includes x86_64 RIP relateive address reference
|
|
p = re.compile(".*?:\s*[^ ]*\s*(.* PTR ).*(0x[^ ]*)")
|
|
m = p.search(inst)
|
|
if not m:
|
|
p = re.compile(".*?:\s.*\s(0x[^ ]*|\w+)")
|
|
m = p.search(inst)
|
|
if m:
|
|
target = m.group(1)
|
|
target = self.parse_and_eval(target)
|
|
else:
|
|
target = None
|
|
else:
|
|
if "]" in m.group(2): # e.g DWORD PTR [ebx+0xc]
|
|
p = re.compile(".*?:\s*[^ ]*\s*(.* PTR ).*\[(.*)\]")
|
|
m = p.search(inst)
|
|
target = self.parse_and_eval("%s[%s]" % (m.group(1), m.group(2).strip()))
|
|
|
|
return to_int(target)
|
|
|
|
def testjump(self, inst=None):
|
|
"""
|
|
Test if jump instruction is taken or not
|
|
|
|
Returns:
|
|
- (status, address of target jumped instruction)
|
|
"""
|
|
|
|
flags = self.get_eflags()
|
|
if not flags:
|
|
return None
|
|
|
|
if not inst:
|
|
pc = self.getreg("pc")
|
|
inst = self.execute_redirect("x/i 0x%x" % pc)
|
|
if not inst:
|
|
return None
|
|
|
|
opcode = inst.split(":\t")[-1].split()[0]
|
|
next_addr = self.eval_target(inst)
|
|
if next_addr is None:
|
|
next_addr = 0
|
|
|
|
if opcode == "jmp":
|
|
return next_addr
|
|
if opcode == "je" and flags["ZF"]:
|
|
return next_addr
|
|
if opcode == "jne" and not flags["ZF"]:
|
|
return next_addr
|
|
if opcode == "jg" and not flags["ZF"] and (flags["SF"] == flags["OF"]):
|
|
return next_addr
|
|
if opcode == "jge" and (flags["SF"] == flags["OF"]):
|
|
return next_addr
|
|
if opcode == "ja" and not flags["CF"] and not flags["ZF"]:
|
|
return next_addr
|
|
if opcode == "jae" and not flags["CF"]:
|
|
return next_addr
|
|
if opcode == "jl" and (flags["SF"] != flags["OF"]):
|
|
return next_addr
|
|
if opcode == "jle" and (flags["ZF"] or (flags["SF"] != flags["OF"])):
|
|
return next_addr
|
|
if opcode == "jb" and flags["CF"]:
|
|
return next_addr
|
|
if opcode == "jbe" and (flags["CF"] or flags["ZF"]):
|
|
return next_addr
|
|
if opcode == "jo" and flags["OF"]:
|
|
return next_addr
|
|
if opcode == "jno" and not flags["OF"]:
|
|
return next_addr
|
|
if opcode == "jz" and flags["ZF"]:
|
|
return next_addr
|
|
if opcode == "jnz" and flags["OF"]:
|
|
return next_addr
|
|
|
|
return None
|
|
|
|
def take_snapshot(self):
|
|
"""
|
|
Take a snapshot of current process
|
|
Warning: this is not thread safe, do not use with multithread program
|
|
|
|
Returns:
|
|
- dictionary of snapshot data
|
|
"""
|
|
if not self.getpid():
|
|
return None
|
|
|
|
maps = self.get_vmmap()
|
|
if not maps:
|
|
return None
|
|
|
|
snapshot = {}
|
|
# get registers
|
|
snapshot["reg"] = self.getregs()
|
|
# get writable memory regions
|
|
snapshot["mem"] = {}
|
|
for (start, end, perm, _) in maps:
|
|
if "w" in perm:
|
|
snapshot["mem"][start] = self.dumpmem(start, end)
|
|
|
|
return snapshot
|
|
|
|
def save_snapshot(self, filename=None):
|
|
"""
|
|
Save a snapshot of current process to file
|
|
Warning: this is not thread safe, do not use with multithread program
|
|
|
|
Args:
|
|
- filename: target file to save snapshot
|
|
|
|
Returns:
|
|
- Bool
|
|
"""
|
|
if not filename:
|
|
filename = self.get_config_filename("snapshot")
|
|
|
|
snapshot = self.take_snapshot()
|
|
if not snapshot:
|
|
return False
|
|
# dump to file
|
|
fd = open(filename, "wb")
|
|
pickle.dump(snapshot, fd, pickle.HIGHEST_PROTOCOL)
|
|
fd.close()
|
|
|
|
return True
|
|
|
|
def give_snapshot(self, snapshot):
|
|
"""
|
|
Restore a saved snapshot of current process
|
|
Warning: this is not thread safe, do not use with multithread program
|
|
|
|
Returns:
|
|
- Bool
|
|
"""
|
|
if not snapshot or not self.getpid():
|
|
return False
|
|
|
|
# restore memory regions
|
|
for (addr, buf) in snapshot["mem"].items():
|
|
self.writemem(addr, buf)
|
|
|
|
# restore registers, SP will be the last one
|
|
for (r, v) in snapshot["reg"].items():
|
|
self.execute("set $%s = 0x%x" % (r, v))
|
|
if r.endswith("sp"):
|
|
sp = v
|
|
self.execute("set $sp = 0x%x" % sp)
|
|
|
|
return True
|
|
|
|
def restore_snapshot(self, filename=None):
|
|
"""
|
|
Restore a saved snapshot of current process from file
|
|
Warning: this is not thread safe, do not use with multithread program
|
|
|
|
Args:
|
|
- file: saved snapshot
|
|
|
|
Returns:
|
|
- Bool
|
|
"""
|
|
if not filename:
|
|
filename = self.get_config_filename("snapshot")
|
|
|
|
fd = open(filename, "rb")
|
|
snapshot = pickle.load(fd)
|
|
return self.give_snapshot(snapshot)
|
|
|
|
|
|
#########################
|
|
# Memory Operations #
|
|
#########################
|
|
@memoized
|
|
def get_vmmap(self, name=None):
|
|
"""
|
|
Get virtual memory mapping address ranges of debugged process
|
|
|
|
Args:
|
|
- name: name/address of binary/library to get mapping range (String)
|
|
+ name = "binary" means debugged program
|
|
+ name = "all" means all virtual maps
|
|
|
|
Returns:
|
|
- list of virtual mapping ranges (start(Int), end(Int), permission(String), mapname(String))
|
|
|
|
"""
|
|
def _get_offline_maps():
|
|
name = self.getfile()
|
|
if not name:
|
|
return None
|
|
headers = self.elfheader()
|
|
binmap = []
|
|
hlist = [x for x in headers.items() if x[1][2] == 'code']
|
|
hlist = sorted(hlist, key=lambda x:x[1][0])
|
|
binmap += [(hlist[0][1][0], hlist[-1][1][1], "rx-p", name)]
|
|
|
|
hlist = [x for x in headers.items() if x[1][2] == 'rodata']
|
|
hlist = sorted(hlist, key=lambda x:x[1][0])
|
|
binmap += [(hlist[0][1][0], hlist[-1][1][1], "r--p", name)]
|
|
|
|
hlist = [x for x in headers.items() if x[1][2] == 'data']
|
|
hlist = sorted(hlist, key=lambda x:x[1][0])
|
|
binmap += [(hlist[0][1][0], hlist[-1][1][1], "rw-p", name)]
|
|
|
|
return binmap
|
|
|
|
def _get_allmaps_osx(pid, remote=False):
|
|
maps = []
|
|
#_DATA 00007fff77975000-00007fff77976000 [ 4K] rw-/rw- SM=COW /usr/lib/system/libremovefile.dylib
|
|
pattern = re.compile("([^\n]*)\s* ([0-9a-f][^-\s]*)-([^\s]*) \[.*\]\s([^/]*).* (.*)")
|
|
|
|
if remote: # remote target, not yet supported
|
|
return maps
|
|
else: # local target
|
|
try: out = execute_external_command("/usr/bin/vmmap -w %s" % self.getpid())
|
|
except: error_msg("could not read vmmap of process")
|
|
|
|
matches = pattern.findall(out)
|
|
if matches:
|
|
for (name, start, end, perm, mapname) in matches:
|
|
if name.startswith("Stack"):
|
|
mapname = "[stack]"
|
|
start = to_int("0x%s" % start)
|
|
end = to_int("0x%s" % end)
|
|
if mapname == "":
|
|
mapname = name.strip()
|
|
maps += [(start, end, perm, mapname)]
|
|
return maps
|
|
|
|
|
|
def _get_allmaps_freebsd(pid, remote=False):
|
|
maps = []
|
|
mpath = "/proc/%s/map" % pid
|
|
# 0x8048000 0x8049000 1 0 0xc36afdd0 r-x 1 0 0x1000 COW NC vnode /path/to/file NCH -1
|
|
pattern = re.compile("0x([0-9a-f]*) 0x([0-9a-f]*)(?: [^ ]*){3} ([rwx-]*)(?: [^ ]*){6} ([^ ]*)")
|
|
|
|
if remote: # remote target, not yet supported
|
|
return maps
|
|
else: # local target
|
|
try: out = open(mpath).read()
|
|
except: error_msg("could not open %s; is procfs mounted?" % mpath)
|
|
|
|
matches = pattern.findall(out)
|
|
if matches:
|
|
for (start, end, perm, mapname) in matches:
|
|
if start[:2] in ["bf", "7f", "ff"] and "rw" in perm:
|
|
mapname = "[stack]"
|
|
start = to_int("0x%s" % start)
|
|
end = to_int("0x%s" % end)
|
|
if mapname == "-":
|
|
if start == maps[-1][1] and maps[-1][-1][0] == "/":
|
|
mapname = maps[-1][-1]
|
|
else:
|
|
mapname = "mapped"
|
|
maps += [(start, end, perm, mapname)]
|
|
return maps
|
|
|
|
def _get_allmaps_linux(pid, remote=False):
|
|
maps = []
|
|
mpath = "/proc/%s/maps" % pid
|
|
#00400000-0040b000 r-xp 00000000 08:02 538840 /path/to/file
|
|
pattern = re.compile("([0-9a-f]*)-([0-9a-f]*) ([rwxps-]*)(?: [^ ]*){3} *(.*)")
|
|
|
|
if remote: # remote target
|
|
tmp = tmpfile()
|
|
self.execute("remote get %s %s" % (mpath, tmp.name))
|
|
tmp.seek(0)
|
|
out = tmp.read()
|
|
tmp.close()
|
|
else: # local target
|
|
out = open(mpath).read()
|
|
|
|
matches = pattern.findall(out)
|
|
if matches:
|
|
for (start, end, perm, mapname) in matches:
|
|
start = to_int("0x%s" % start)
|
|
end = to_int("0x%s" % end)
|
|
if mapname == "":
|
|
mapname = "mapped"
|
|
maps += [(start, end, perm, mapname)]
|
|
return maps
|
|
|
|
result = []
|
|
pid = self.getpid()
|
|
if not pid: # not running, try to use elfheader()
|
|
try:
|
|
return _get_offline_maps()
|
|
except:
|
|
return []
|
|
|
|
# retrieve all maps
|
|
os = self.getos()
|
|
rmt = self.is_target_remote()
|
|
maps = []
|
|
try:
|
|
if os == "FreeBSD": maps = _get_allmaps_freebsd(pid, rmt)
|
|
elif os == "Linux" : maps = _get_allmaps_linux(pid, rmt)
|
|
elif os == "Darwin" : maps = _get_allmaps_osx(pid, rmt)
|
|
except Exception as e:
|
|
if config.Option.get("debug") == "on":
|
|
msg("Exception: %s" %e)
|
|
traceback.print_exc()
|
|
|
|
# select maps matched specific name
|
|
if name == "binary":
|
|
name = self.getfile()
|
|
if name is None or name == "all":
|
|
name = ""
|
|
|
|
if to_int(name) is None:
|
|
for (start, end, perm, mapname) in maps:
|
|
if name in mapname:
|
|
result += [(start, end, perm, mapname)]
|
|
else:
|
|
addr = to_int(name)
|
|
for (start, end, perm, mapname) in maps:
|
|
if start <= addr and addr < end:
|
|
result += [(start, end, perm, mapname)]
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def get_vmrange(self, address, maps=None):
|
|
"""
|
|
Get virtual memory mapping range of an address
|
|
|
|
Args:
|
|
- address: target address (Int)
|
|
- maps: only find in provided maps (List)
|
|
|
|
Returns:
|
|
- tuple of virtual memory info (start, end, perm, mapname)
|
|
"""
|
|
if address is None:
|
|
return None
|
|
if maps is None:
|
|
maps = self.get_vmmap()
|
|
if maps:
|
|
for (start, end, perm, mapname) in maps:
|
|
if start <= address and end > address:
|
|
return (start, end, perm, mapname)
|
|
# failed to get the vmmap
|
|
else:
|
|
try:
|
|
gdb.selected_inferior().read_memory(address, 1)
|
|
start = address & 0xfffffffffffff000
|
|
end = start + 0x1000
|
|
return (start, end, 'rwx', 'unknown')
|
|
except:
|
|
return None
|
|
|
|
|
|
@memoized
|
|
def is_executable(self, address, maps=None):
|
|
"""
|
|
Check if an address is executable
|
|
|
|
Args:
|
|
- address: target address (Int)
|
|
- maps: only check in provided maps (List)
|
|
|
|
Returns:
|
|
- True if address belongs to an executable address range (Bool)
|
|
"""
|
|
vmrange = self.get_vmrange(address, maps)
|
|
if vmrange and "x" in vmrange[2]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@memoized
|
|
def is_writable(self, address, maps=None):
|
|
"""
|
|
Check if an address is writable
|
|
|
|
Args:
|
|
- address: target address (Int)
|
|
- maps: only check in provided maps (List)
|
|
|
|
Returns:
|
|
- True if address belongs to a writable address range (Bool)
|
|
"""
|
|
vmrange = self.get_vmrange(address, maps)
|
|
if vmrange and "w" in vmrange[2]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@memoized
|
|
def is_address(self, value, maps=None):
|
|
"""
|
|
Check if a value is a valid address (belongs to a memory region)
|
|
|
|
Args:
|
|
- value (Int)
|
|
- maps: only check in provided maps (List)
|
|
|
|
Returns:
|
|
- True if value belongs to an address range (Bool)
|
|
"""
|
|
vmrange = self.get_vmrange(value, maps)
|
|
return vmrange is not None
|
|
|
|
@memoized
|
|
def get_disasm(self, address, count=1):
|
|
"""
|
|
Get the ASM code of instruction at address
|
|
|
|
Args:
|
|
- address: address to read instruction (Int)
|
|
- count: number of code lines (Int)
|
|
|
|
Returns:
|
|
- asm code (String)
|
|
"""
|
|
code = self.execute_redirect("x/%di 0x%x" % (count, address))
|
|
if code:
|
|
return code.rstrip()
|
|
else:
|
|
return ""
|
|
|
|
def dumpmem(self, start, end):
|
|
"""
|
|
Dump process memory from start to end
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
|
|
Returns:
|
|
- memory content (raw bytes)
|
|
"""
|
|
mem = None
|
|
logfd = tmpfile(is_binary_file=True)
|
|
logname = logfd.name
|
|
out = self.execute_redirect("dump memory %s 0x%x 0x%x" % (logname, start, end))
|
|
if out is None:
|
|
return None
|
|
else:
|
|
logfd.flush()
|
|
mem = logfd.read()
|
|
logfd.close()
|
|
|
|
return mem
|
|
|
|
def readmem(self, address, size):
|
|
"""
|
|
Read content of memory at an address
|
|
|
|
Args:
|
|
- address: start address to read (Int)
|
|
- size: bytes to read (Int)
|
|
|
|
Returns:
|
|
- memory content (raw bytes)
|
|
"""
|
|
# try fast dumpmem if it works
|
|
mem = self.dumpmem(address, address+size)
|
|
if mem is not None:
|
|
return mem
|
|
|
|
# failed to dump, use slow x/gx way
|
|
mem = ""
|
|
out = self.execute_redirect("x/%dbx 0x%x" % (size, address))
|
|
if out:
|
|
for line in out.splitlines():
|
|
bytes = line.split(":\t")[-1].split()
|
|
mem += "".join([chr(int(c, 0)) for c in bytes])
|
|
|
|
return mem
|
|
|
|
def read_int(self, address, intsize=None):
|
|
"""
|
|
Read an interger value from memory
|
|
|
|
Args:
|
|
- address: address to read (Int)
|
|
- intsize: force read size (Int)
|
|
|
|
Returns:
|
|
- mem value (Int)
|
|
"""
|
|
if not intsize:
|
|
intsize = self.intsize()
|
|
value = self.readmem(address, intsize)
|
|
if value:
|
|
value = to_int("0x" + codecs.encode(value[::-1], 'hex'))
|
|
return value
|
|
else:
|
|
return None
|
|
|
|
|
|
def read_long(self, address):
|
|
"""
|
|
Read a long long value from memory
|
|
|
|
Args:
|
|
- address: address to read (Int)
|
|
|
|
Returns:
|
|
- mem value (Long Long)
|
|
"""
|
|
return self.read_int(address, 8)
|
|
|
|
def writemem(self, address, buf):
|
|
"""
|
|
Write buf to memory start at an address
|
|
|
|
Args:
|
|
- address: start address to write (Int)
|
|
- buf: data to write (raw bytes)
|
|
|
|
Returns:
|
|
- number of written bytes (Int)
|
|
"""
|
|
out = None
|
|
if not buf:
|
|
return 0
|
|
|
|
if self.getpid():
|
|
# try fast restore mem
|
|
tmp = tmpfile(is_binary_file=True)
|
|
tmp.write(buf)
|
|
tmp.flush()
|
|
out = self.execute_redirect("restore %s binary 0x%x" % (tmp.name, address))
|
|
tmp.close()
|
|
if not out: # try the slow way
|
|
for i in range(len(buf)):
|
|
if not self.execute("set {char}0x%x = 0x%x" % (address+i, ord(buf[i]))):
|
|
return i
|
|
return i+1
|
|
elif "error" in out: # failed to write the whole buf, find written byte
|
|
for i in range(0, len(buf), 1):
|
|
if not self.is_address(address+i):
|
|
return i
|
|
else:
|
|
return len(buf)
|
|
|
|
def write_int(self, address, value, intsize=None):
|
|
"""
|
|
Write an interger value to memory
|
|
|
|
Args:
|
|
- address: address to read (Int)
|
|
- value: int to write to (Int)
|
|
- intsize: force write size (Int)
|
|
|
|
Returns:
|
|
- Bool
|
|
"""
|
|
if not intsize:
|
|
intsize = self.intsize()
|
|
buf = hex2str(value, intsize).ljust(intsize, "\x00")[:intsize]
|
|
saved = self.readmem(address, intsize)
|
|
if not saved:
|
|
return False
|
|
|
|
ret = self.writemem(address, buf)
|
|
if ret != intsize:
|
|
self.writemem(address, saved)
|
|
return False
|
|
return True
|
|
|
|
def write_long(self, address, value):
|
|
"""
|
|
Write a long long value to memory
|
|
|
|
Args:
|
|
- address: address to read (Int)
|
|
- value: value to write to
|
|
|
|
Returns:
|
|
- Bool
|
|
"""
|
|
return self.write_int(address, value, 8)
|
|
|
|
def cmpmem(self, start, end, buf):
|
|
"""
|
|
Compare contents of a memory region with a buffer
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- buf: raw bytes
|
|
|
|
Returns:
|
|
- dictionary of array of diffed bytes in hex (Dictionary)
|
|
{123: [("A", "B"), ("C", "C"))]}
|
|
"""
|
|
line_len = 32
|
|
if end < start:
|
|
(start, end) = (end, start)
|
|
|
|
mem = self.dumpmem(start, end)
|
|
if mem is None:
|
|
return None
|
|
|
|
length = min(len(mem), len(buf))
|
|
result = {}
|
|
lineno = 0
|
|
for i in range(length//line_len):
|
|
diff = 0
|
|
bytes_ = []
|
|
for j in range(line_len):
|
|
offset = i*line_len+j
|
|
bytes_ += [(mem[offset:offset + 1], buf[offset:offset + 1])]
|
|
if mem[offset] != buf[offset]:
|
|
diff = 1
|
|
if diff == 1:
|
|
result[start+lineno] = bytes_
|
|
lineno += line_len
|
|
|
|
bytes_ = []
|
|
diff = 0
|
|
for i in range(length % line_len):
|
|
offset = lineno+i
|
|
bytes_ += [(mem[offset:offset + 1], buf[offset:offset + 1])]
|
|
if mem[offset] != buf[offset]:
|
|
diff = 1
|
|
if diff == 1:
|
|
result[start+lineno] = bytes_
|
|
|
|
return result
|
|
|
|
def xormem(self, start, end, key):
|
|
"""
|
|
XOR a memory region with a key
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- key: XOR key (String)
|
|
|
|
Returns:
|
|
- xored memory content (raw bytes)
|
|
"""
|
|
mem = self.dumpmem(start, end)
|
|
if mem is None:
|
|
return None
|
|
|
|
if to_int(key) != None:
|
|
key = hex2str(to_int(key), self.intsize())
|
|
mem = list(bytes_iterator(mem))
|
|
for index, char in enumerate(mem):
|
|
key_idx = index % len(key)
|
|
mem[index] = chr(ord(char) ^ ord(key[key_idx]))
|
|
|
|
buf = b"".join([to_binary_string(x) for x in mem])
|
|
bytes = self.writemem(start, buf)
|
|
return buf
|
|
|
|
def searchmem(self, start, end, search, mem=None):
|
|
"""
|
|
Search for all instances of a pattern in memory from start to end
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- search: string or python regex pattern (String)
|
|
- mem: cached mem to not re-read for repeated searches (raw bytes)
|
|
|
|
Returns:
|
|
- list of found result: (address(Int), hex encoded value(String))
|
|
|
|
"""
|
|
|
|
result = []
|
|
if end < start:
|
|
(start, end) = (end, start)
|
|
|
|
if mem is None:
|
|
mem = self.dumpmem(start, end)
|
|
|
|
if not mem:
|
|
return result
|
|
|
|
if isinstance(search, six.string_types) and search.startswith("0x"):
|
|
# hex number
|
|
search = search[2:]
|
|
if len(search) %2 != 0:
|
|
search = "0" + search
|
|
search = codecs.decode(search, 'hex')[::-1]
|
|
search = re.escape(search)
|
|
|
|
# Convert search to bytes if is not already
|
|
if not isinstance(search, bytes):
|
|
search = search.encode('utf-8')
|
|
|
|
try:
|
|
p = re.compile(search)
|
|
except:
|
|
search = re.escape(search)
|
|
p = re.compile(search)
|
|
|
|
found = list(p.finditer(mem))
|
|
for m in found:
|
|
index = 1
|
|
if m.start() == m.end() and m.lastindex:
|
|
index = m.lastindex+1
|
|
for i in range(0,index):
|
|
if m.start(i) != m.end(i):
|
|
result += [(start + m.start(i), codecs.encode(mem[m.start(i):m.end(i)], 'hex'))]
|
|
|
|
return result
|
|
|
|
def searchmem_by_range(self, mapname, search):
|
|
"""
|
|
Search for all instances of a pattern in virtual memory ranges
|
|
|
|
Args:
|
|
- search: string or python regex pattern (String)
|
|
- mapname: name of virtual memory range (String)
|
|
|
|
Returns:
|
|
- list of found result: (address(Int), hex encoded value(String))
|
|
"""
|
|
|
|
result = []
|
|
ranges = self.get_vmmap(mapname)
|
|
if ranges:
|
|
for (start, end, perm, name) in ranges:
|
|
if "r" in perm:
|
|
result += self.searchmem(start, end, search)
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def search_reference(self, search, mapname=None):
|
|
"""
|
|
Search for all references to a value in memory ranges
|
|
|
|
Args:
|
|
- search: string or python regex pattern (String)
|
|
- mapname: name of target virtual memory range (String)
|
|
|
|
Returns:
|
|
- list of found result: (address(int), hex encoded value(String))
|
|
"""
|
|
|
|
maps = self.get_vmmap()
|
|
ranges = self.get_vmmap(mapname)
|
|
result = []
|
|
search_result = []
|
|
for (start, end, perm, name) in maps:
|
|
if "r" in perm:
|
|
search_result += self.searchmem(start, end, search)
|
|
|
|
for (start, end, perm, name) in ranges:
|
|
for (a, v) in search_result:
|
|
result += self.searchmem(start, end, to_address(a))
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def search_address(self, searchfor="stack", belongto="binary"):
|
|
"""
|
|
Search for all valid addresses in memory ranges
|
|
|
|
Args:
|
|
- searchfor: memory region to search for addresses (String)
|
|
- belongto: memory region that target addresses belong to (String)
|
|
|
|
Returns:
|
|
- list of found result: (address(Int), value(Int))
|
|
"""
|
|
|
|
result = []
|
|
maps = self.get_vmmap()
|
|
if maps is None:
|
|
return result
|
|
|
|
searchfor_ranges = self.get_vmmap(searchfor)
|
|
belongto_ranges = self.get_vmmap(belongto)
|
|
step = self.intsize()
|
|
for (start, end, _, _) in searchfor_ranges[::-1]: # dirty trick, to search in rw-p mem first
|
|
mem = self.dumpmem(start, end)
|
|
if not mem:
|
|
continue
|
|
for i in range(0, len(mem), step):
|
|
search = "0x" + codecs.encode(mem[i:i+step][::-1], 'hex').decode('utf-8')
|
|
addr = to_int(search)
|
|
if self.is_address(addr, belongto_ranges):
|
|
result += [(start+i, addr)]
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def search_pointer(self, searchfor="stack", belongto="binary"):
|
|
"""
|
|
Search for all valid pointers in memory ranges
|
|
|
|
Args:
|
|
- searchfor: memory region to search for pointers (String)
|
|
- belongto: memory region that pointed addresses belong to (String)
|
|
|
|
Returns:
|
|
- list of found result: (address(Int), value(Int))
|
|
"""
|
|
|
|
search_result = []
|
|
result = []
|
|
maps = self.get_vmmap()
|
|
searchfor_ranges = self.get_vmmap(searchfor)
|
|
belongto_ranges = self.get_vmmap(belongto)
|
|
step = self.intsize()
|
|
for (start, end, _, _) in searchfor_ranges[::-1]:
|
|
mem = self.dumpmem(start, end)
|
|
if not mem:
|
|
continue
|
|
for i in range(0, len(mem), step):
|
|
search = "0x" + codecs.encode(mem[i:i+step][::-1], 'hex').decode('utf-8')
|
|
addr = to_int(search)
|
|
if self.is_address(addr):
|
|
(v, t, vn) = self.examine_mem_value(addr)
|
|
if t != 'value':
|
|
if self.is_address(to_int(vn), belongto_ranges):
|
|
if (to_int(v), v) not in search_result:
|
|
search_result += [(to_int(v), v)]
|
|
|
|
for (a, v) in search_result:
|
|
result += self.searchmem(start, end, to_address(a), mem)
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def examine_mem_value(self, value):
|
|
"""
|
|
Examine a value in memory for its type and reference
|
|
|
|
Args:
|
|
- value: value to examine (Int)
|
|
|
|
Returns:
|
|
- tuple of (value(Int), type(String), next_value(Int))
|
|
"""
|
|
def examine_data(value, bits=32):
|
|
out = self.execute_redirect("x/%sx 0x%x" % ("g" if bits == 64 else "w", value))
|
|
if out:
|
|
v = out.split(":\t")[-1].strip()
|
|
if is_printable(int2hexstr(to_int(v), bits//8)):
|
|
out = self.execute_redirect("x/s 0x%x" % value)
|
|
return out
|
|
|
|
result = (None, None, None)
|
|
if value is None:
|
|
return result
|
|
|
|
maps = self.get_vmmap()
|
|
binmap = self.get_vmmap("binary")
|
|
|
|
(arch, bits) = self.getarch()
|
|
if not self.is_address(value): # a value
|
|
result = (to_hex(value), "value", "")
|
|
return result
|
|
else:
|
|
(_, _, _, mapname) = self.get_vmrange(value)
|
|
|
|
# check for writable first so rwxp mem will be treated as data
|
|
if self.is_writable(value): # writable data address
|
|
out = examine_data(value, bits)
|
|
if out:
|
|
result = (to_hex(value), "data", out.split(":", 1)[1].strip())
|
|
|
|
elif self.is_executable(value): # code/rodata address
|
|
if self.is_address(value, binmap):
|
|
headers = self.elfheader()
|
|
else:
|
|
headers = self.elfheader_solib(mapname)
|
|
|
|
if headers:
|
|
headers = sorted(headers.items(), key=lambda x: x[1][1])
|
|
for (k, (start, end, type)) in headers:
|
|
if value >= start and value < end:
|
|
if type == "code":
|
|
out = self.get_disasm(value)
|
|
p = re.compile(".*?0x[^ ]*?\s(.*)")
|
|
m = p.search(out)
|
|
result = (to_hex(value), "code", m.group(1))
|
|
else: # rodata address
|
|
out = examine_data(value, bits)
|
|
result = (to_hex(value), "rodata", out.split(":", 1)[1].strip())
|
|
break
|
|
|
|
if result[0] is None: # not fall to any header section
|
|
out = examine_data(value, bits)
|
|
result = (to_hex(value), "rodata", out.split(":", 1)[1].strip())
|
|
|
|
else: # not belong to any lib: [heap], [vdso], [vsyscall], etc
|
|
out = self.get_disasm(value)
|
|
if "(bad)" in out:
|
|
out = examine_data(value, bits)
|
|
result = (to_hex(value), "rodata", out.split(":", 1)[1].strip())
|
|
else:
|
|
p = re.compile(".*?0x[^ ]*?\s(.*)")
|
|
m = p.search(out)
|
|
result = (to_hex(value), "code", m.group(1))
|
|
|
|
else: # readonly data address
|
|
out = examine_data(value, bits)
|
|
if out:
|
|
result = (to_hex(value), "rodata", out.split(":", 1)[1].strip())
|
|
else:
|
|
result = (to_hex(value), "rodata", "MemError")
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def examine_mem_reference(self, value, depth=5):
|
|
"""
|
|
Deeply examine a value in memory for its references
|
|
|
|
Args:
|
|
- value: value to examine (Int)
|
|
|
|
Returns:
|
|
- list of tuple of (value(Int), type(String), next_value(Int))
|
|
"""
|
|
result = []
|
|
if depth <= 0:
|
|
depth = 0xffffffff
|
|
|
|
(v, t, vn) = self.examine_mem_value(value)
|
|
while vn is not None:
|
|
if len(result) > depth:
|
|
_v, _t, _vn = result[-1]
|
|
result[-1] = (_v, _t, "--> ...")
|
|
break
|
|
|
|
result += [(v, t, vn)]
|
|
if v == vn or to_int(v) == to_int(vn): # point to self
|
|
break
|
|
if to_int(vn) is None:
|
|
break
|
|
if to_int(vn) in [to_int(v) for (v, _, _) in result]: # point back to previous value
|
|
break
|
|
(v, t, vn) = self.examine_mem_value(to_int(vn))
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def format_search_result(self, result, display=256):
|
|
"""
|
|
Format the result from various memory search commands
|
|
|
|
Args:
|
|
- result: result of search commands (List)
|
|
- display: number of items to display
|
|
|
|
Returns:
|
|
- text: formatted text (String)
|
|
"""
|
|
|
|
text = ""
|
|
if not result:
|
|
text = "Not found"
|
|
else:
|
|
maxlen = 0
|
|
maps = self.get_vmmap()
|
|
shortmaps = []
|
|
for (start, end, perm, name) in maps:
|
|
shortname = os.path.basename(name)
|
|
if shortname.startswith("lib"):
|
|
shortname = shortname.split("-")[0]
|
|
shortmaps += [(start, end, perm, shortname)]
|
|
|
|
count = len(result)
|
|
if display != 0:
|
|
count = min(count, display)
|
|
text += "Found %d results, display max %d items:\n" % (len(result), count)
|
|
for (addr, v) in result[:count]:
|
|
vmrange = self.get_vmrange(addr, shortmaps)
|
|
maxlen = max(maxlen, len(vmrange[3]))
|
|
|
|
for (addr, v) in result[:count]:
|
|
vmrange = self.get_vmrange(addr, shortmaps)
|
|
chain = self.examine_mem_reference(addr)
|
|
text += "%s : %s" % (vmrange[3].rjust(maxlen), format_reference_chain(chain) + "\n")
|
|
|
|
return text
|
|
|
|
|
|
##########################
|
|
# Exploit Helpers #
|
|
##########################
|
|
@memoized
|
|
def elfentry(self):
|
|
"""
|
|
Get entry point address of debugged ELF file
|
|
|
|
Returns:
|
|
- entry address (Int)
|
|
"""
|
|
out = self.execute_redirect("info files")
|
|
p = re.compile("Entry point: ([^\s]*)")
|
|
if out:
|
|
m = p.search(out)
|
|
if m:
|
|
return to_int(m.group(1))
|
|
return None
|
|
|
|
@memoized
|
|
def elfheader(self, name=None):
|
|
"""
|
|
Get headers information of debugged ELF file
|
|
|
|
Args:
|
|
- name: specific header name (String)
|
|
|
|
Returns:
|
|
- dictionary of headers {name(String): (start(Int), end(Int), type(String))}
|
|
"""
|
|
elfinfo = {}
|
|
elfbase = 0
|
|
if self.getpid():
|
|
binmap = self.get_vmmap("binary")
|
|
elfbase = binmap[0][0] if binmap else 0
|
|
|
|
out = self.execute_redirect("maintenance info sections")
|
|
if not out:
|
|
return {}
|
|
|
|
p = re.compile("\s*(0x[^-]*)->(0x[^ ]*) at (0x[^:]*):\s*([^ ]*)\s*(.*)")
|
|
matches = p.findall(out)
|
|
|
|
for (start, end, offset, hname, attr) in matches:
|
|
start, end, offset = to_int(start), to_int(end), to_int(offset)
|
|
# skip unuseful header
|
|
if start < offset:
|
|
continue
|
|
# if PIE binary, update with runtime address
|
|
if start < elfbase:
|
|
start += elfbase
|
|
end += elfbase
|
|
|
|
if "CODE" in attr:
|
|
htype = "code"
|
|
elif "READONLY" in attr:
|
|
htype = "rodata"
|
|
else:
|
|
htype = "data"
|
|
|
|
elfinfo[hname.strip()] = (start, end, htype)
|
|
|
|
result = {}
|
|
if name is None:
|
|
result = elfinfo
|
|
else:
|
|
if name in elfinfo:
|
|
result[name] = elfinfo[name]
|
|
else:
|
|
for (k, v) in elfinfo.items():
|
|
if name in k:
|
|
result[k] = v
|
|
return result
|
|
|
|
@memoized
|
|
def elfsymbols(self, pattern=None):
|
|
"""
|
|
Get all non-debugging symbol information of debugged ELF file
|
|
|
|
Returns:
|
|
- dictionary of (address(Int), symname(String))
|
|
"""
|
|
headers = self.elfheader()
|
|
if ".plt" not in headers: # static binary
|
|
return {}
|
|
|
|
binmap = self.get_vmmap("binary")
|
|
elfbase = binmap[0][0] if binmap else 0
|
|
|
|
# get the .dynstr header
|
|
headers = self.elfheader()
|
|
if ".dynstr" not in headers:
|
|
return {}
|
|
(start, end, _) = headers[".dynstr"]
|
|
mem = self.dumpmem(start, end)
|
|
if not mem and self.getfile():
|
|
fd = open(self.getfile())
|
|
fd.seek(start, 0)
|
|
mem = fd.read(end-start)
|
|
fd.close()
|
|
|
|
# Convert names into strings
|
|
dynstrings = [name.decode('utf-8') for name in mem.split(b"\x00")]
|
|
|
|
if pattern:
|
|
dynstrings = [s for s in dynstrings if re.search(pattern, s)]
|
|
|
|
# get symname@plt info
|
|
symbols = {}
|
|
for symname in dynstrings:
|
|
if not symname: continue
|
|
symname += "@plt"
|
|
out = self.execute_redirect("info functions %s" % symname)
|
|
if not out: continue
|
|
m = re.findall(".*(0x[^ ]*)\s*%s" % re.escape(symname), out)
|
|
for addr in m:
|
|
addr = to_int(addr)
|
|
if self.is_address(addr, binmap):
|
|
if symname not in symbols:
|
|
symbols[symname] = addr
|
|
break
|
|
|
|
# if PIE binary, update with runtime address
|
|
for (k, v) in symbols.items():
|
|
if v < elfbase:
|
|
symbols[k] = v + elfbase
|
|
|
|
return symbols
|
|
|
|
@memoized
|
|
def elfsymbol(self, symname=None):
|
|
"""
|
|
Get non-debugging symbol information of debugged ELF file
|
|
|
|
Args:
|
|
- name: target function name (String), special cases:
|
|
+ "data": data transfer functions
|
|
+ "exec": exec helper functions
|
|
|
|
Returns:
|
|
- if exact name is not provided: dictionary of tuple (symname, plt_entry)
|
|
- if exact name is provided: dictionary of tuple (symname, plt_entry, got_entry, reloc_entry)
|
|
"""
|
|
datafuncs = ["printf", "puts", "gets", "cpy"]
|
|
execfuncs = ["system", "exec", "mprotect", "mmap", "syscall"]
|
|
result = {}
|
|
if not symname or symname in ["data", "exec"]:
|
|
symbols = self.elfsymbols()
|
|
else:
|
|
symbols = self.elfsymbols(symname)
|
|
|
|
if not symname:
|
|
result = symbols
|
|
else:
|
|
sname = symname.replace("@plt", "") + "@plt"
|
|
if sname in symbols:
|
|
plt_addr = symbols[sname]
|
|
result[sname] = plt_addr # plt entry
|
|
out = self.get_disasm(plt_addr, 2)
|
|
for line in out.splitlines():
|
|
if "jmp" in line:
|
|
addr = to_int("0x" + line.strip().rsplit("0x")[-1].split()[0])
|
|
result[sname.replace("@plt","@got")] = addr # got entry
|
|
if "push" in line:
|
|
addr = to_int("0x" + line.strip().rsplit("0x")[-1])
|
|
result[sname.replace("@plt","@reloc")] = addr # reloc offset
|
|
else:
|
|
keywords = [symname]
|
|
if symname == "data":
|
|
keywords = datafuncs
|
|
if symname == "exec":
|
|
keywords = execfuncs
|
|
for (k, v) in symbols.items():
|
|
for f in keywords:
|
|
if f in k:
|
|
result[k] = v
|
|
|
|
return result
|
|
|
|
@memoized
|
|
def main_entry(self):
|
|
"""
|
|
Get address of main function of stripped ELF file
|
|
|
|
Returns:
|
|
- main function address (Int)
|
|
"""
|
|
refs = self.xrefs("__libc_start_main@plt")
|
|
if refs:
|
|
inst = self.prev_inst(refs[0][0])
|
|
if inst:
|
|
addr = re.search(".*(0x.*)", inst[0][1])
|
|
if addr:
|
|
return to_int(addr.group(1))
|
|
return None
|
|
|
|
@memoized
|
|
def readelf_header(self, filename, name=None):
|
|
"""
|
|
Get headers information of an ELF file using 'readelf'
|
|
|
|
Args:
|
|
- filename: ELF file (String)
|
|
- name: specific header name (String)
|
|
|
|
Returns:
|
|
- dictionary of headers (name(String), value(Int)) (Dict)
|
|
"""
|
|
elfinfo = {}
|
|
vmap = self.get_vmmap(filename)
|
|
elfbase = vmap[0][0] if vmap else 0
|
|
out = execute_external_command("%s -W -S %s" % (config.READELF, filename))
|
|
if not out:
|
|
return {}
|
|
p = re.compile(".*\[.*\] (\.[^ ]*) [^0-9]* ([^ ]*) [^ ]* ([^ ]*)(.*)")
|
|
matches = p.findall(out)
|
|
if not matches:
|
|
return result
|
|
|
|
for (hname, start, size, attr) in matches:
|
|
start, end = to_int("0x"+start), to_int("0x"+start) + to_int("0x"+size)
|
|
# if PIE binary or DSO, update with runtime address
|
|
if start < elfbase:
|
|
start += elfbase
|
|
if end < elfbase:
|
|
end += elfbase
|
|
|
|
if "X" in attr:
|
|
htype = "code"
|
|
elif "W" in attr:
|
|
htype = "data"
|
|
else:
|
|
htype = "rodata"
|
|
elfinfo[hname.strip()] = (start, end, htype)
|
|
|
|
result = {}
|
|
if name is None:
|
|
result = elfinfo
|
|
else:
|
|
if name in elfinfo:
|
|
result[name] = elfinfo[name]
|
|
else:
|
|
for (k, v) in elfinfo.items():
|
|
if name in k:
|
|
result[k] = v
|
|
return result
|
|
|
|
@memoized
|
|
def elfheader_solib(self, solib=None, name=None):
|
|
"""
|
|
Get headers information of Shared Object Libraries linked to target
|
|
|
|
Args:
|
|
- solib: shared library name (String)
|
|
- name: specific header name (String)
|
|
|
|
Returns:
|
|
- dictionary of headers {name(String): start(Int), end(Int), type(String))
|
|
"""
|
|
# hardcoded ELF header type
|
|
header_type = {"code": [".text", ".fini", ".init", ".plt", "__libc_freeres_fn"],
|
|
"data": [".dynamic", ".data", ".ctors", ".dtors", ".jrc", ".got", ".got.plt",
|
|
".bss", ".tdata", ".tbss", ".data.rel.ro", ".fini_array",
|
|
"__libc_subfreeres", "__libc_thread_subfreeres"]
|
|
}
|
|
|
|
@memoized
|
|
def _elfheader_solib_all():
|
|
out = self.execute_redirect("info files")
|
|
if not out:
|
|
return None
|
|
|
|
p = re.compile("[^\n]*\s*(0x[^ ]*) - (0x[^ ]*) is (\.[^ ]*) in (.*)")
|
|
soheaders = p.findall(out)
|
|
|
|
result = []
|
|
for (start, end, hname, libname) in soheaders:
|
|
start, end = to_int(start), to_int(end)
|
|
result += [(start, end, hname, os.path.realpath(libname))] # tricky, return the realpath version of libraries
|
|
return result
|
|
|
|
elfinfo = {}
|
|
|
|
headers = _elfheader_solib_all()
|
|
if not headers:
|
|
return {}
|
|
|
|
if solib is None:
|
|
return headers
|
|
|
|
vmap = self.get_vmmap(solib)
|
|
elfbase = vmap[0][0] if vmap else 0
|
|
|
|
for (start, end, hname, libname) in headers:
|
|
if solib in libname:
|
|
# if PIE binary or DSO, update with runtime address
|
|
if start < elfbase:
|
|
start += elfbase
|
|
if end < elfbase:
|
|
end += elfbase
|
|
# determine the type
|
|
htype = "rodata"
|
|
if hname in header_type["code"]:
|
|
htype = "code"
|
|
elif hname in header_type["data"]:
|
|
htype = "data"
|
|
elfinfo[hname.strip()] = (start, end, htype)
|
|
|
|
result = {}
|
|
if name is None:
|
|
result = elfinfo
|
|
else:
|
|
if name in elfinfo:
|
|
result[name] = elfinfo[name]
|
|
else:
|
|
for (k, v) in elfinfo.items():
|
|
if name in k:
|
|
result[k] = v
|
|
return result
|
|
|
|
def checksec(self, filename=None):
|
|
"""
|
|
Check for various security options of binary (ref: http://www.trapkit.de/tools/checksec.sh)
|
|
|
|
Args:
|
|
- file: path name of file to check (String)
|
|
|
|
Returns:
|
|
- dictionary of (setting(String), status(Int)) (Dict)
|
|
"""
|
|
result = {}
|
|
result["RELRO"] = 0
|
|
result["CANARY"] = 0
|
|
result["NX"] = 1
|
|
result["PIE"] = 0
|
|
result["FORTIFY"] = 0
|
|
|
|
if filename is None:
|
|
filename = self.getfile()
|
|
|
|
if not filename:
|
|
return None
|
|
|
|
out = execute_external_command("%s -W -a \"%s\" 2>&1" % (config.READELF, filename))
|
|
if "Error:" in out:
|
|
return None
|
|
|
|
for line in out.splitlines():
|
|
if "GNU_RELRO" in line:
|
|
result["RELRO"] |= 2
|
|
if "BIND_NOW" in line:
|
|
result["RELRO"] |= 1
|
|
if "__stack_chk_fail" in line:
|
|
result["CANARY"] = 1
|
|
if "GNU_STACK" in line and "RWE" in line:
|
|
result["NX"] = 0
|
|
if "Type:" in line and "DYN (" in line:
|
|
result["PIE"] = 4 # Dynamic Shared Object
|
|
if "(DEBUG)" in line and result["PIE"] == 4:
|
|
result["PIE"] = 1
|
|
if "_chk@" in line:
|
|
result["FORTIFY"] = 1
|
|
|
|
if result["RELRO"] == 1:
|
|
result["RELRO"] = 0 # ? | BIND_NOW + NO GNU_RELRO = NO PROTECTION
|
|
# result["RELRO"] == 2 # Partial | NO BIND_NOW + GNU_RELRO
|
|
# result["RELRO"] == 3 # Full | BIND_NOW + GNU_RELRO
|
|
return result
|
|
|
|
def _verify_rop_gadget(self, start, end, depth=5):
|
|
"""
|
|
Verify ROP gadget code from start to end with max number of instructions
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end addres (Int)
|
|
- depth: number of instructions (Int)
|
|
|
|
Returns:
|
|
- list of valid gadgets (address(Int), asmcode(String))
|
|
"""
|
|
|
|
result = []
|
|
valid = 0
|
|
out = self.execute_redirect("disassemble 0x%x, 0x%x" % (start, end+1))
|
|
if not out:
|
|
return []
|
|
|
|
code = out.splitlines()[1:-1]
|
|
for line in code:
|
|
if "bad" in line:
|
|
return []
|
|
(addr, code) = line.strip().split(":", 1)
|
|
addr = to_int(addr.split()[0])
|
|
result += [(addr, " ".join(code.strip().split()))]
|
|
if "ret" in code:
|
|
return result
|
|
if len(result) > depth:
|
|
break
|
|
|
|
return []
|
|
|
|
@memoized
|
|
def search_asm(self, start, end, asmcode, rop=0):
|
|
"""
|
|
Search for ASM instructions in memory
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- asmcode: assembly instruction (String)
|
|
+ multiple instructions are separated by ";"
|
|
+ wildcard ? supported, will be replaced by registers or multi-bytes
|
|
|
|
Returns:
|
|
- list of (address(Int), hexbyte(String))
|
|
"""
|
|
wildcard = asmcode.count('?')
|
|
magic_bytes = ["0x00", "0xff", "0xdead", "0xdeadbeef", "0xdeadbeefdeadbeef"]
|
|
|
|
ops = [x for x in asmcode.split(';') if x]
|
|
def buildcode(code=b"", pos=0, depth=0):
|
|
if depth == wildcard and pos == len(ops):
|
|
yield code
|
|
return
|
|
|
|
c = ops[pos].count('?')
|
|
if c > 2: return
|
|
elif c == 0:
|
|
asm = self.assemble(ops[pos])
|
|
if asm:
|
|
for code in buildcode(code + asm, pos+1, depth):
|
|
yield code
|
|
else:
|
|
save = ops[pos]
|
|
for regs in REGISTERS.values():
|
|
for reg in regs:
|
|
ops[pos] = save.replace("?", reg, 1)
|
|
for asmcode_reg in buildcode(code, pos, depth+1):
|
|
yield asmcode_reg
|
|
for byte in magic_bytes:
|
|
ops[pos] = save.replace("?", byte, 1)
|
|
for asmcode_mem in buildcode(code, pos, depth+1):
|
|
yield asmcode_mem
|
|
ops[pos] = save
|
|
|
|
searches = []
|
|
|
|
def decode_hex_escape(str_):
|
|
"""Decode string as hex and escape for regex"""
|
|
return re.escape(codecs.decode(str_, 'hex'))
|
|
|
|
for machine_code in buildcode():
|
|
search = re.escape(machine_code)
|
|
search = search.replace(decode_hex_escape(b"dead"), b"..")\
|
|
.replace(decode_hex_escape(b"beef"), b"..")\
|
|
.replace(decode_hex_escape(b"00"), b".")\
|
|
.replace(decode_hex_escape(b"ff"), b".")
|
|
|
|
if rop and 'ret' not in asmcode:
|
|
search += b".{0,24}\\xc3"
|
|
searches.append(search)
|
|
|
|
if not searches:
|
|
warning_msg("invalid asmcode: '%s'" % asmcode)
|
|
return []
|
|
|
|
search = b"(?=(" + b"|".join(searches) + b"))"
|
|
candidates = self.searchmem(start, end, search)
|
|
|
|
if rop:
|
|
result = {}
|
|
for (a, v) in candidates:
|
|
gadget = self._verify_rop_gadget(a, a+len(v)//2 - 1)
|
|
# gadget format: [(address, asmcode), (address, asmcode), ...]
|
|
if gadget != []:
|
|
blen = gadget[-1][0] - gadget[0][0] + 1
|
|
bytes = v[:2*blen]
|
|
asmcode_rs = "; ".join([c for _, c in gadget])
|
|
if re.search(re.escape(asmcode).replace("\ ",".*").replace("\?",".*"), asmcode_rs)\
|
|
and a not in result:
|
|
result[a] = (bytes, asmcode_rs)
|
|
result = list(result.items())
|
|
else:
|
|
result = []
|
|
for (a, v) in candidates:
|
|
asmcode = self.execute_redirect("disassemble 0x%x, 0x%x" % (a, a+(len(v)//2)))
|
|
if asmcode:
|
|
asmcode = "\n".join(asmcode.splitlines()[1:-1])
|
|
matches = re.findall(".*:([^\n]*)", asmcode)
|
|
result += [(a, (v, ";".join(matches).strip()))]
|
|
|
|
return result
|
|
|
|
def dumprop(self, start, end, keyword=None, depth=5):
|
|
"""
|
|
Dump unique ROP gadgets in memory
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- keyword: to match start of gadgets (String)
|
|
|
|
Returns:
|
|
- dictionary of (address(Int), asmcode(String))
|
|
"""
|
|
|
|
EXTRA_WORDS = ["BYTE ", " WORD", "DWORD ", "FWORD ", "QWORD ", "PTR ", "FAR "]
|
|
result = {}
|
|
mem = self.dumpmem(start, end)
|
|
if mem is None:
|
|
return {}
|
|
|
|
if keyword:
|
|
search = keyword
|
|
else:
|
|
search = ""
|
|
|
|
if len(mem) > 20000: # limit backward depth if searching in large mem
|
|
depth = 3
|
|
found = re.finditer(b"\xc3", mem)
|
|
found = list(found)
|
|
for m in found:
|
|
idx = start+m.start()
|
|
for i in range(1, 24):
|
|
gadget = self._verify_rop_gadget(idx-i, idx, depth)
|
|
if gadget != []:
|
|
k = "; ".join([v for (a, v) in gadget])
|
|
if k.startswith(search):
|
|
for w in EXTRA_WORDS:
|
|
k = k.replace(w, "")
|
|
if k not in result:
|
|
result[k] = gadget[0][0]
|
|
return result
|
|
|
|
def common_rop_gadget(self, mapname=None):
|
|
"""
|
|
Get common rop gadgets in binary: ret, popret, pop2ret, pop3ret, add [mem] reg, add reg [mem]
|
|
|
|
Returns:
|
|
- dictionary of (gadget(String), address(Int))
|
|
"""
|
|
|
|
def _valid_register_opcode(bytes_):
|
|
if not bytes_:
|
|
return False
|
|
|
|
for c in bytes_iterator(bytes_):
|
|
if ord(c) not in list(range(0x58, 0x60)):
|
|
return False
|
|
return True
|
|
|
|
result = {}
|
|
if mapname is None:
|
|
mapname = "binary"
|
|
maps = self.get_vmmap(mapname)
|
|
if maps is None:
|
|
return result
|
|
|
|
for (start, end, _, _) in maps:
|
|
if not self.is_executable(start, maps): continue
|
|
|
|
mem = self.dumpmem(start, end)
|
|
found = self.searchmem(start, end, b"....\xc3", mem)
|
|
for (a, v) in found:
|
|
v = codecs.decode(v, 'hex')
|
|
if "ret" not in result:
|
|
result["ret"] = a+4
|
|
if "leaveret" not in result:
|
|
if v[-2] == "\xc9":
|
|
result["leaveret"] = a+3
|
|
if "popret" not in result:
|
|
if _valid_register_opcode(v[-2:-1]):
|
|
result["popret"] = a+3
|
|
if "pop2ret" not in result:
|
|
if _valid_register_opcode(v[-3:-1]):
|
|
result["pop2ret"] = a+2
|
|
if "pop3ret" not in result:
|
|
if _valid_register_opcode(v[-4:-1]):
|
|
result["pop3ret"] = a+1
|
|
if "pop4ret" not in result:
|
|
if _valid_register_opcode(v[-5:-1]):
|
|
result["pop4ret"] = a
|
|
|
|
# search for add esp, byte 0xNN
|
|
found = self.searchmem(start, end, b"\x83\xc4([^\xc3]){0,24}\xc3", mem)
|
|
# search for add esp, 0xNNNN
|
|
found += self.searchmem(start, end, b"\x81\xc4([^\xc3]){0,24}\xc3", mem)
|
|
for (a, v) in found:
|
|
if v.startswith(b"81"):
|
|
offset = to_int("0x" + codecs.encode(codecs.decode(v, 'hex')[2:5][::-1], 'hex').decode('utf-8'))
|
|
elif v.startswith(b"83"):
|
|
offset = to_int("0x" + v[4:6].decode('utf-8'))
|
|
gg = self._verify_rop_gadget(a, a+len(v)//2-1)
|
|
for (_, c) in gg:
|
|
if "pop" in c:
|
|
offset += 4
|
|
gadget = "addesp_%d" % offset
|
|
if gadget not in result:
|
|
result[gadget] = a
|
|
|
|
return result
|
|
|
|
def search_jmpcall(self, start, end, regname=None):
|
|
"""
|
|
Search memory for jmp/call reg instructions
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- reg: register name (String)
|
|
|
|
Returns:
|
|
- list of (address(Int), instruction(String))
|
|
"""
|
|
|
|
result = []
|
|
REG = {0: "eax", 1: "ecx", 2: "edx", 3: "ebx", 4: "esp", 5: "ebp", 6: "esi", 7:"edi"}
|
|
P2REG = {0: "[eax]", 1: "[ecx]", 2: "[edx]", 3: "[ebx]", 6: "[esi]", 7:"[edi]"}
|
|
OPCODE = {0xe: "jmp", 0xd: "call"}
|
|
P2OPCODE = {0x1: "call", 0x2: "jmp"}
|
|
JMPREG = [b"\xff" + bytes_chr(i) for i in range(0xe0, 0xe8)]
|
|
JMPREG += [b"\xff" + bytes_chr(i) for i in range(0x20, 0x28)]
|
|
CALLREG = [b"\xff" + bytes_chr(i) for i in range(0xd0, 0xd8)]
|
|
CALLREG += [b"\xff" + bytes_chr(i) for i in range(0x10, 0x18)]
|
|
JMPCALL = JMPREG + CALLREG
|
|
|
|
if regname is None:
|
|
regname = ""
|
|
regname = regname.lower()
|
|
pattern = re.compile(b'|'.join(JMPCALL).replace(b' ', b'\ '))
|
|
mem = self.dumpmem(start, end)
|
|
found = pattern.finditer(mem)
|
|
(arch, bits) = self.getarch()
|
|
for m in list(found):
|
|
inst = ""
|
|
addr = start + m.start()
|
|
opcode = codecs.encode(m.group()[1:2], 'hex')
|
|
type = int(opcode[0:1], 16)
|
|
reg = int(opcode[1:2], 16)
|
|
if type in OPCODE:
|
|
inst = OPCODE[type] + " " + REG[reg]
|
|
|
|
if type in P2OPCODE and reg in P2REG:
|
|
inst = P2OPCODE[type] + " " + P2REG[reg]
|
|
|
|
if inst != "" and regname[-2:] in inst.split()[-1]:
|
|
if bits == 64:
|
|
inst = inst.replace("e", "r")
|
|
result += [(addr, inst)]
|
|
|
|
return result
|
|
|
|
def search_substr(self, start, end, search, mem=None):
|
|
"""
|
|
Search for substrings of a given string/number in memory
|
|
|
|
Args:
|
|
- start: start address (Int)
|
|
- end: end address (Int)
|
|
- search: string to search for (String)
|
|
- mem: cached memory (raw bytes)
|
|
|
|
Returns:
|
|
- list of tuple (substr(String), address(Int))
|
|
"""
|
|
def substr(s1, s2):
|
|
"Search for a string in another string"
|
|
s1 = to_binary_string(s1)
|
|
s2 = to_binary_string(s2)
|
|
i = 1
|
|
found = 0
|
|
while i <= len(s1):
|
|
if s2.find(s1[:i]) != -1:
|
|
found = 1
|
|
i += 1
|
|
if s1[:i-1][-1:] == b"\x00":
|
|
break
|
|
else:
|
|
break
|
|
if found == 1:
|
|
return i-1
|
|
else:
|
|
return -1
|
|
|
|
result = []
|
|
if end < start:
|
|
start, end = end, start
|
|
|
|
if mem is None:
|
|
mem = self.dumpmem(start, end)
|
|
|
|
if search[:2] == "0x": # hex number
|
|
search = search[2:]
|
|
if len(search) %2 != 0:
|
|
search = "0" + search
|
|
search = codecs.decode(search, 'hex')[::-1]
|
|
search = to_binary_string(decode_string_escape(search))
|
|
while search:
|
|
l = len(search)
|
|
i = substr(search, mem)
|
|
if i != -1:
|
|
sub = search[:i]
|
|
addr = start + mem.find(sub)
|
|
if not check_badchars(addr):
|
|
result.append((sub, addr))
|
|
else:
|
|
result.append((search, -1))
|
|
return result
|
|
search = search[i:]
|
|
return result
|
|
|
|
|
|
##############################
|
|
# ROP Payload Generation #
|
|
##############################
|
|
def payload_copybytes(self, target=None, data=None, template=0):
|
|
"""
|
|
Suggest function for ret2plt exploit and generate payload for it
|
|
|
|
Args:
|
|
- target: address to copy data to (Int)
|
|
- data: (String)
|
|
Returns:
|
|
- python code template (String)
|
|
"""
|
|
result = ""
|
|
funcs = ["strcpy", "sprintf", "strncpy", "snprintf", "memcpy"]
|
|
|
|
symbols = self.elfsymbols()
|
|
transfer = ""
|
|
for f in funcs:
|
|
if f+"@plt" in symbols:
|
|
transfer = f
|
|
break
|
|
if transfer == "":
|
|
warning_msg("No copy function available")
|
|
return None
|
|
|
|
headers = self.elfheader()
|
|
start = min([v[0] for (k, v) in headers.items() if v[0] > 0])
|
|
end = max([v[1] for (k, v) in headers.items() if v[2] != "data"])
|
|
symbols = self.elfsymbol(transfer)
|
|
if not symbols:
|
|
warning_msg("Unable to find symbols")
|
|
return None
|
|
|
|
plt_func = transfer + "_plt"
|
|
plt_addr = symbols[transfer+"@plt"]
|
|
gadgets = self.common_rop_gadget()
|
|
function_template = "\n".join([
|
|
"popret = 0x%x" % gadgets["popret"],
|
|
"pop2ret = 0x%x" % gadgets["pop2ret"],
|
|
"pop3ret = 0x%x" % gadgets["pop3ret"],
|
|
"def %s_payload(target, bytes):" % transfer,
|
|
" %s = 0x%x" % (plt_func, plt_addr),
|
|
" payload = []",
|
|
" offset = 0",
|
|
" for (str, addr) in bytes:",
|
|
"",
|
|
])
|
|
if "ncp" in transfer or "mem" in transfer: # memcpy() style
|
|
function_template += "\n".join([
|
|
" payload += [%s, pop3ret, target+offset, addr, len(str)]" % plt_func,
|
|
" offset += len(str)",
|
|
])
|
|
elif "snp" in transfer: # snprintf()
|
|
function_template += "\n".join([
|
|
" payload += [%s, pop3ret, target+offset, len(str)+1, addr]" % plt_func,
|
|
" offset += len(str)",
|
|
])
|
|
else:
|
|
function_template += "\n".join([
|
|
" payload += [%s, pop2ret, target+offset, addr]" % plt_func,
|
|
" offset += len(str)",
|
|
])
|
|
function_template += "\n".join(["",
|
|
" return payload",
|
|
"",
|
|
"payload = []"
|
|
])
|
|
|
|
if target is None:
|
|
if template != 0:
|
|
return function_template
|
|
else:
|
|
return ""
|
|
|
|
#text = "\n_payload = []\n"
|
|
text = "\n"
|
|
mem = self.dumpmem(start, end)
|
|
bytes = self.search_substr(start, end, data, mem)
|
|
|
|
if to_int(target) is not None:
|
|
target = to_hex(target)
|
|
text += "# %s <= %s\n" % (target, repr(data))
|
|
if not bytes:
|
|
text += "***Failed***\n"
|
|
else:
|
|
text += "bytes = [\n"
|
|
for (s, a) in bytes:
|
|
if a != -1:
|
|
text += " (%s, %s),\n" % (repr(s), to_hex(a))
|
|
else:
|
|
text += " (%s, ***Failed***),\n" % repr(s)
|
|
text += "\n".join([
|
|
"]",
|
|
"payload += %s_payload(%s, bytes)" % (transfer, target),
|
|
"",
|
|
])
|
|
|
|
return text
|
|
|
|
|
|
###########################################################################
|
|
class PEDACmd(object):
|
|
"""
|
|
Class for PEDA commands that interact with GDB
|
|
"""
|
|
commands = []
|
|
def __init__(self):
|
|
# list of all available commands
|
|
self.commands = [c for c in dir(self) if callable(getattr(self, c)) and not c.startswith("_")]
|
|
|
|
##################
|
|
# Misc Utils #
|
|
##################
|
|
def _missing_argument(self):
|
|
"""
|
|
Raise exception for missing argument, for internal use
|
|
"""
|
|
text = "missing argument"
|
|
error_msg(text)
|
|
raise Exception(text)
|
|
|
|
def _is_running(self):
|
|
"""
|
|
Check if program is running, for internal use
|
|
"""
|
|
pid = peda.getpid()
|
|
if pid is None:
|
|
text = "not running"
|
|
warning_msg(text)
|
|
return None
|
|
#raise Exception(text)
|
|
else:
|
|
return pid
|
|
|
|
def reload(self, *arg):
|
|
"""
|
|
Reload PEDA sources, keep current options untouch
|
|
Usage:
|
|
MYNAME [name]
|
|
"""
|
|
(modname,) = normalize_argv(arg, 1)
|
|
# save current PEDA options
|
|
saved_opt = config.Option
|
|
peda_path = os.path.dirname(PEDAFILE) + "/lib/"
|
|
if not modname:
|
|
modname = "PEDA" # just for notification
|
|
ret = peda.execute("source %s" % PEDAFILE)
|
|
else:
|
|
if not modname.endswith(".py"):
|
|
modname = modname + ".py"
|
|
filepath = "%s/%s" % (peda_path, modname)
|
|
if os.path.exists(filepath):
|
|
ret = peda.execute("source %s" % filepath)
|
|
peda.execute("source %s" % PEDAFILE)
|
|
else:
|
|
ret = False
|
|
|
|
config.Option = saved_opt
|
|
if ret:
|
|
msg("%s reloaded!" % modname, "blue")
|
|
else:
|
|
msg("Failed to reload %s source from: %s" % (modname, peda_path))
|
|
return
|
|
|
|
def _get_helptext(self, *arg):
|
|
"""
|
|
Get the help text, for internal use by help command and other aliases
|
|
"""
|
|
|
|
(cmd,) = normalize_argv(arg, 1)
|
|
helptext = ""
|
|
if cmd is None:
|
|
helptext = red("PEDA", "bold") + blue(" - Python Exploit Development Assistance for GDB", "bold") + "\n"
|
|
helptext += "For latest update, check peda project page: %s\n" % green("https://github.com/longld/peda/")
|
|
helptext += "List of \"peda\" subcommands, type the subcommand to invoke it:\n"
|
|
i = 0
|
|
for cmd in self.commands:
|
|
if cmd.startswith("_"): continue # skip internal use commands
|
|
func = getattr(self, cmd)
|
|
helptext += "%s -- %s\n" % (cmd, green(trim(func.__doc__.strip("\n").splitlines()[0])))
|
|
helptext += "\nType \"help\" followed by subcommand for full documentation."
|
|
else:
|
|
if cmd in self.commands:
|
|
func = getattr(self, cmd)
|
|
lines = trim(func.__doc__).splitlines()
|
|
helptext += green(lines[0]) + "\n"
|
|
for line in lines[1:]:
|
|
if "Usage:" in line:
|
|
helptext += blue(line) + "\n"
|
|
else:
|
|
helptext += line + "\n"
|
|
else:
|
|
for c in self.commands:
|
|
if not c.startswith("_") and cmd in c:
|
|
func = getattr(self, c)
|
|
helptext += "%s -- %s\n" % (c, green(trim(func.__doc__.strip("\n").splitlines()[0])))
|
|
|
|
return helptext
|
|
|
|
def help(self, *arg):
|
|
"""
|
|
Print the usage manual for PEDA commands
|
|
Usage:
|
|
MYNAME
|
|
MYNAME command
|
|
"""
|
|
|
|
msg(self._get_helptext(*arg))
|
|
|
|
return
|
|
help.options = commands
|
|
|
|
def pyhelp(self, *arg):
|
|
"""
|
|
Wrapper for python built-in help
|
|
Usage:
|
|
MYNAME (enter interactive help)
|
|
MYNAME help_request
|
|
"""
|
|
(request,) = normalize_argv(arg, 1)
|
|
if request is None:
|
|
help()
|
|
return
|
|
|
|
peda_methods = ["%s" % c for c in dir(PEDA) if callable(getattr(PEDA, c)) and \
|
|
not c.startswith("_")]
|
|
|
|
if request in peda_methods:
|
|
request = "peda.%s" % request
|
|
try:
|
|
if request.lower().startswith("peda"):
|
|
request = eval(request)
|
|
help(request)
|
|
return
|
|
|
|
if "." in request:
|
|
module, _, function = request.rpartition('.')
|
|
if module:
|
|
module = module.split(".")[0]
|
|
__import__(module)
|
|
mod = sys.modules[module]
|
|
if function:
|
|
request = getattr(mod, function)
|
|
else:
|
|
request = mod
|
|
else:
|
|
mod = sys.modules['__main__']
|
|
request = getattr(mod, request)
|
|
|
|
# wrapper for python built-in help
|
|
help(request)
|
|
except: # fallback to built-in help
|
|
try:
|
|
help(request)
|
|
except Exception as e:
|
|
if config.Option.get("debug") == "on":
|
|
msg('Exception (%s): %s' % ('pyhelp', e), "red")
|
|
traceback.print_exc()
|
|
msg("no Python documentation found for '%s'" % request)
|
|
|
|
return
|
|
pyhelp.options = ["%s" % c for c in dir(PEDA) if callable(getattr(PEDA, c)) and \
|
|
not c.startswith("_")]
|
|
|
|
# show [option | args | env]
|
|
def show(self, *arg):
|
|
"""
|
|
Show various PEDA options and other settings
|
|
Usage:
|
|
MYNAME option [optname]
|
|
MYNAME (show all options)
|
|
MYNAME args
|
|
MYNAME env [envname]
|
|
"""
|
|
# show options
|
|
def _show_option(name=None):
|
|
if name is None:
|
|
name = ""
|
|
filename = peda.getfile()
|
|
if filename:
|
|
filename = os.path.basename(filename)
|
|
else:
|
|
filename = None
|
|
for (k, v) in sorted(config.Option.show(name).items()):
|
|
if filename and isinstance(v, str) and "#FILENAME#" in v:
|
|
v = v.replace("#FILENAME#", filename)
|
|
msg("%s = %s" % (k, repr(v)))
|
|
return
|
|
|
|
# show args
|
|
def _show_arg():
|
|
arg = peda.execute_redirect("show args")
|
|
arg = arg.split("started is ")[1][1:-3]
|
|
arg = (peda.string_to_argv(arg))
|
|
if not arg:
|
|
msg("No argument")
|
|
for (i, a) in enumerate(arg):
|
|
text = "arg[%d]: %s" % ((i+1), a if is_printable(a) else to_hexstr(a))
|
|
msg(text)
|
|
return
|
|
|
|
# show envs
|
|
def _show_env(name=None):
|
|
if name is None:
|
|
name = ""
|
|
env = peda.execute_redirect("show env")
|
|
for line in env.splitlines():
|
|
(k, v) = line.split("=", 1)
|
|
if k.startswith(name):
|
|
msg("%s = %s" % (k, v if is_printable(v) else to_hexstr(v)))
|
|
return
|
|
|
|
(opt, name) = normalize_argv(arg, 2)
|
|
|
|
if opt is None or opt.startswith("opt"):
|
|
_show_option(name)
|
|
elif opt.startswith("arg"):
|
|
_show_arg()
|
|
elif opt.startswith("env"):
|
|
_show_env(name)
|
|
else:
|
|
msg("Unknown show option: %s" % opt)
|
|
return
|
|
show.options = ["option", "arg", "env"]
|
|
|
|
# set [option | arg | env]
|
|
def set(self, *arg):
|
|
"""
|
|
Set various PEDA options and other settings
|
|
Usage:
|
|
MYNAME option name value
|
|
MYNAME arg string
|
|
MYNAME env name value
|
|
support input non-printable chars, e.g MYNAME env EGG "\\x90"*1000
|
|
"""
|
|
# set options
|
|
def _set_option(name, value):
|
|
if name in config.Option.options:
|
|
config.Option.set(name, value)
|
|
msg("%s = %s" % (name, repr(value)))
|
|
else:
|
|
msg("Unknown option: %s" % name)
|
|
return
|
|
|
|
# set args
|
|
def _set_arg(*arg):
|
|
cmd = "set args"
|
|
for a in arg:
|
|
try:
|
|
s = eval('%s' % a)
|
|
if isinstance(s, six.integer_types + six.string_types):
|
|
a = s
|
|
except:
|
|
pass
|
|
cmd += " '%s'" % a
|
|
peda.execute(cmd)
|
|
return
|
|
|
|
# set env
|
|
def _set_env(name, value):
|
|
env = peda.execute_redirect("show env")
|
|
cmd = "set env %s " % name
|
|
try:
|
|
value = eval('%s' % value)
|
|
except:
|
|
pass
|
|
cmd += '%s' % value
|
|
peda.execute(cmd)
|
|
|
|
return
|
|
|
|
(opt, name, value) = normalize_argv(arg, 3)
|
|
if opt is None:
|
|
self._missing_argument()
|
|
|
|
if opt.startswith("opt"):
|
|
if value is None:
|
|
self._missing_argument()
|
|
_set_option(name, value)
|
|
elif opt.startswith("arg"):
|
|
_set_arg(*arg[1:])
|
|
elif opt.startswith("env"):
|
|
_set_env(name, value)
|
|
else:
|
|
msg("Unknown set option: %s" % known_args.opt)
|
|
return
|
|
set.options = ["option", "arg", "env"]
|
|
|
|
def hexprint(self, *arg):
|
|
"""
|
|
Display hexified of data in memory
|
|
Usage:
|
|
MYNAME address (display 16 bytes from address)
|
|
MYNAME address count
|
|
MYNAME address /count (display "count" lines, 16-bytes each)
|
|
"""
|
|
(address, count) = normalize_argv(arg, 2)
|
|
if address is None:
|
|
self._missing_argument()
|
|
|
|
if count is None:
|
|
count = 16
|
|
|
|
if not to_int(count) and count.startswith("/"):
|
|
count = to_int(count[1:])
|
|
count = count * 16 if count else None
|
|
|
|
bytes_ = peda.dumpmem(address, address+count)
|
|
if bytes_ is None:
|
|
warning_msg("cannot retrieve memory content")
|
|
else:
|
|
hexstr = to_hexstr(bytes_)
|
|
linelen = 16 # display 16-bytes per line
|
|
i = 0
|
|
text = ""
|
|
while hexstr:
|
|
text += '%s : "%s"\n' % (blue(to_address(address+i*linelen)), hexstr[:linelen*4])
|
|
hexstr = hexstr[linelen*4:]
|
|
i += 1
|
|
pager(text)
|
|
|
|
return
|
|
|
|
def hexdump(self, *arg):
|
|
"""
|
|
Display hex/ascii dump of data in memory
|
|
Usage:
|
|
MYNAME address (dump 16 bytes from address)
|
|
MYNAME address count
|
|
MYNAME address /count (dump "count" lines, 16-bytes each)
|
|
"""
|
|
def ascii_char(ch):
|
|
if ord(ch) >= 0x20 and ord(ch) < 0x7e:
|
|
return chr(ord(ch)) # Ensure we return a str
|
|
else:
|
|
return "."
|
|
|
|
(address, count) = normalize_argv(arg, 2)
|
|
if address is None:
|
|
self._missing_argument()
|
|
|
|
if count is None:
|
|
count = 16
|
|
|
|
if not to_int(count) and count.startswith("/"):
|
|
count = to_int(count[1:])
|
|
count = count * 16 if count else None
|
|
|
|
bytes_ = peda.dumpmem(address, address+count)
|
|
if bytes_ is None:
|
|
warning_msg("cannot retrieve memory content")
|
|
else:
|
|
linelen = 16 # display 16-bytes per line
|
|
i = 0
|
|
text = ""
|
|
while bytes_:
|
|
buf = bytes_[:linelen]
|
|
hexbytes = " ".join(["%02x" % ord(c) for c in bytes_iterator(buf)])
|
|
asciibytes = "".join([ascii_char(c) for c in bytes_iterator(buf)])
|
|
text += '%s : %s %s\n' % (blue(to_address(address+i*linelen)), hexbytes.ljust(linelen*3), asciibytes)
|
|
bytes_ = bytes_[linelen:]
|
|
i += 1
|
|
pager(text)
|
|
|
|
return
|
|
|
|
def aslr(self, *arg):
|
|
"""
|
|
Show/set ASLR setting of GDB
|
|
Usage:
|
|
MYNAME [on|off]
|
|
"""
|
|
(option,) = normalize_argv(arg, 1)
|
|
if option is None:
|
|
out = peda.execute_redirect("show disable-randomization")
|
|
if not out:
|
|
warning_msg("ASLR setting is unknown or not available")
|
|
return
|
|
|
|
if "is off" in out:
|
|
msg("ASLR is %s" % green("ON"))
|
|
if "is on" in out:
|
|
msg("ASLR is %s" % red("OFF"))
|
|
else:
|
|
option = option.strip().lower()
|
|
if option in ["on", "off"]:
|
|
peda.execute("set disable-randomization %s" % ("off" if option == "on" else "on"))
|
|
|
|
return
|
|
|
|
def xprint(self, *arg):
|
|
"""
|
|
Extra support to GDB's print command
|
|
Usage:
|
|
MYNAME expression
|
|
"""
|
|
text = ""
|
|
exp = " ".join(list(arg))
|
|
m = re.search(".*\[(.*)\]|.*?s:(0x[^ ]*)", exp)
|
|
if m:
|
|
addr = peda.parse_and_eval(m.group(1))
|
|
if to_int(addr):
|
|
text += "[0x%x]: " % to_int(addr)
|
|
|
|
out = peda.parse_and_eval(exp)
|
|
if to_int(out):
|
|
chain = peda.examine_mem_reference(to_int(out))
|
|
text += format_reference_chain(chain)
|
|
msg(text)
|
|
return
|
|
|
|
def distance(self, *arg):
|
|
"""
|
|
Calculate distance between two addresses
|
|
Usage:
|
|
MYNAME address (calculate from current $SP to address)
|
|
MYNAME address1 address2
|
|
"""
|
|
(start, end) = normalize_argv(arg, 2)
|
|
if to_int(start) is None or (to_int(end) is None and not self._is_running()):
|
|
self._missing_argument()
|
|
|
|
sp = None
|
|
if end is None:
|
|
sp = peda.getreg("sp")
|
|
end = start
|
|
start = sp
|
|
|
|
dist = end - start
|
|
text = "From 0x%x%s to 0x%x: " % (start, " (SP)" if start == sp else "", end)
|
|
text += "%d bytes, %d dwords%s" % (dist, dist//4, " (+%d bytes)" % (dist%4) if (dist%4 != 0) else "")
|
|
msg(text)
|
|
|
|
return
|
|
|
|
def session(self, *arg):
|
|
"""
|
|
Save/restore a working gdb session to file as a script
|
|
Usage:
|
|
MYNAME save [filename]
|
|
MYNAME restore [filename]
|
|
"""
|
|
options = ["save", "restore", "autosave"]
|
|
(option, filename) = normalize_argv(arg, 2)
|
|
if option not in options:
|
|
self._missing_argument()
|
|
|
|
if not filename:
|
|
filename = peda.get_config_filename("session")
|
|
|
|
if option == "save":
|
|
if peda.save_session(filename):
|
|
msg("Saved GDB session to file %s" % filename)
|
|
else:
|
|
msg("Failed to save GDB session")
|
|
|
|
if option == "restore":
|
|
if peda.restore_session(filename):
|
|
msg("Restored GDB session from file %s" % filename)
|
|
else:
|
|
msg("Failed to restore GDB session")
|
|
|
|
if option == "autosave":
|
|
if config.Option.get("autosave") == "on":
|
|
peda.save_session(filename)
|
|
|
|
return
|
|
session.options = ["save", "restore"]
|
|
|
|
#################################
|
|
# Debugging Helper Commands #
|
|
#################################
|
|
def procinfo(self, *arg):
|
|
"""
|
|
Display various info from /proc/pid/
|
|
Usage:
|
|
MYNAME [pid]
|
|
"""
|
|
options = ["exe", "fd", "pid", "ppid", "uid", "gid"]
|
|
|
|
if peda.getos() != "Linux":
|
|
warning_msg("this command is only available on Linux")
|
|
|
|
(pid,) = normalize_argv(arg, 1)
|
|
|
|
if not pid:
|
|
pid = peda.getpid()
|
|
|
|
if not pid:
|
|
return
|
|
|
|
info = {}
|
|
try:
|
|
info["exe"] = os.path.realpath("/proc/%d/exe" % pid)
|
|
except:
|
|
warning_msg("cannot access /proc/%d/" % pid)
|
|
return
|
|
|
|
# fd list
|
|
info["fd"] = {}
|
|
fdlist = os.listdir("/proc/%d/fd" % pid)
|
|
for fd in fdlist:
|
|
rpath = os.readlink("/proc/%d/fd/%s" % (pid, fd))
|
|
sock = re.search("socket:\[(.*)\]", rpath)
|
|
if sock:
|
|
spath = execute_external_command("netstat -aen | grep %s" % sock.group(1))
|
|
if spath:
|
|
rpath = spath.strip()
|
|
info["fd"][to_int(fd)] = rpath
|
|
|
|
# uid/gid, pid, ppid
|
|
info["pid"] = pid
|
|
status = open("/proc/%d/status" % pid).read()
|
|
ppid = re.search("PPid:\s*([^\s]*)", status).group(1)
|
|
info["ppid"] = to_int(ppid) if ppid else -1
|
|
uid = re.search("Uid:\s*([^\n]*)", status).group(1)
|
|
info["uid"] = [to_int(id) for id in uid.split()]
|
|
gid = re.search("Gid:\s*([^\n]*)", status).group(1)
|
|
info["gid"] = [to_int(id) for id in gid.split()]
|
|
|
|
for opt in options:
|
|
if opt == "fd":
|
|
for (fd, path) in info[opt].items():
|
|
msg("fd[%d] -> %s" % (fd, path))
|
|
else:
|
|
msg("%s = %s" % (opt, info[opt]))
|
|
return
|
|
|
|
# getfile()
|
|
def getfile(self):
|
|
"""
|
|
Get exec filename of current debugged process
|
|
Usage:
|
|
MYNAME
|
|
"""
|
|
filename = peda.getfile()
|
|
if filename == None:
|
|
msg("No file specified")
|
|
else:
|
|
msg(filename)
|
|
return
|
|
|
|
# getpid()
|
|
def getpid(self):
|
|
"""
|
|
Get PID of current debugged process
|
|
Usage:
|
|
MYNAME
|
|
"""
|
|
pid = self._is_running()
|
|
msg(pid)
|
|
return
|
|
|
|
# disassemble()
|
|
def pdisass(self, *arg):
|
|
"""
|
|
Format output of gdb disassemble command with colors
|
|
Usage:
|
|
MYNAME "args for gdb disassemble command"
|
|
MYNAME address /NN: equivalent to "x/NNi address"
|
|
"""
|
|
(address, fmt_count) = normalize_argv(arg, 2)
|
|
if isinstance(fmt_count, str) and fmt_count.startswith("/"):
|
|
count = to_int(fmt_count[1:])
|
|
if not count or to_int(address) is None:
|
|
self._missing_argument()
|
|
else:
|
|
code = peda.get_disasm(address, count)
|
|
else:
|
|
code = peda.disassemble(*arg)
|
|
msg(format_disasm_code(code))
|
|
|
|
return
|
|
|
|
# disassemble_around
|
|
def nearpc(self, *arg):
|
|
"""
|
|
Disassemble instructions nearby current PC or given address
|
|
Usage:
|
|
MYNAME [count]
|
|
MYNAME address [count]
|
|
count is maximum 256
|
|
"""
|
|
(address, count) = normalize_argv(arg, 2)
|
|
address = to_int(address)
|
|
|
|
count = to_int(count)
|
|
if address is not None and address < 0x40000:
|
|
count = address
|
|
address = None
|
|
|
|
if address is None:
|
|
address = peda.getreg("pc")
|
|
|
|
if count is None:
|
|
code = peda.disassemble_around(address)
|
|
else:
|
|
code = peda.disassemble_around(address, count)
|
|
|
|
if code:
|
|
msg(format_disasm_code(code, address))
|
|
else:
|
|
error_msg("invalid $pc address or instruction count")
|
|
return
|
|
|
|
def waitfor(self, *arg):
|
|
"""
|
|
Try to attach to new forked process; mimic "attach -waitfor"
|
|
Usage:
|
|
MYNAME [cmdname]
|
|
MYNAME [cmdname] -c (auto continue after attached)
|
|
"""
|
|
(name, opt) = normalize_argv(arg, 2)
|
|
if name == "-c":
|
|
opt = name
|
|
name = None
|
|
|
|
if name is None:
|
|
filename = peda.getfile()
|
|
if filename is None:
|
|
warning_msg("please specify the file to debug or process name to attach")
|
|
return
|
|
else:
|
|
name = os.path.basename(filename)
|
|
|
|
msg("Trying to attach to new forked process (%s), Ctrl-C to stop..." % name)
|
|
cmd = "ps axo pid,command | grep %s | grep -v grep" % name
|
|
getpids = []
|
|
out = execute_external_command(cmd)
|
|
for line in out.splitlines():
|
|
getpids += [line.split()[0].strip()]
|
|
|
|
while True:
|
|
found = 0
|
|
out = execute_external_command(cmd)
|
|
for line in out.splitlines():
|
|
line = line.split()
|
|
pid = line[0].strip()
|
|
cmdname = line[1].strip()
|
|
if name not in cmdname: continue
|
|
if pid not in getpids:
|
|
found = 1
|
|
break
|
|
|
|
if found == 1:
|
|
msg("Attching to pid: %s, cmdname: %s" % (pid, cmdname))
|
|
if peda.getpid():
|
|
peda.execute("detach")
|
|
out = peda.execute_redirect("attach %s" % pid)
|
|
msg(out)
|
|
out = peda.execute_redirect("file %s" % cmdname) # reload symbol file
|
|
msg(out)
|
|
if opt == "-c":
|
|
peda.execute("continue")
|
|
return
|
|
time.sleep(0.5)
|
|
return
|
|
|
|
def pltbreak(self, *arg):
|
|
"""
|
|
Set breakpoint at PLT functions match name regex
|
|
Usage:
|
|
MYNAME [name]
|
|
"""
|
|
(name,) = normalize_argv(arg, 1)
|
|
if not name:
|
|
name = ""
|
|
headers = peda.elfheader()
|
|
end = headers[".bss"]
|
|
symbols = peda.elfsymbol(name)
|
|
if len(symbols) == 0:
|
|
msg("File not specified or PLT symbols not found")
|
|
return
|
|
else:
|
|
# Traverse symbols in order to have more predictable output
|
|
for symname in sorted(symbols):
|
|
if "plt" not in symname: continue
|
|
if name in symname: # fixme(longld) bounds checking?
|
|
line = peda.execute_redirect("break %s" % symname)
|
|
msg("%s (%s)" % (line.strip("\n"), symname))
|
|
return
|
|
|
|
def xrefs(self, *arg):
|
|
"""
|
|
Search for all call/data access references to a function/variable
|
|
Usage:
|
|
MYNAME pattern
|
|
MYNAME pattern file/mapname
|
|
"""
|
|
(search, filename) = normalize_argv(arg, 2)
|
|
if search is None:
|
|
search = "" # search for all call references
|
|
else:
|
|
search = arg[0]
|
|
|
|
if filename is not None: # get full path to file if mapname is provided
|
|
vmap = peda.get_vmmap(filename)
|
|
if vmap:
|
|
filename = vmap[0][3]
|
|
|
|
result = peda.xrefs(search, filename)
|
|
if result:
|
|
if search != "":
|
|
msg("All references to '%s':" % search)
|
|
else:
|
|
msg("All call references")
|
|
for (addr, code) in result:
|
|
msg("%s" % (code))
|
|
else:
|
|
msg("Not found")
|
|
return
|
|
|
|
def deactive(self, *arg):
|
|
"""
|
|
Bypass a function by ignoring its execution (eg sleep/alarm)
|
|
Usage:
|
|
MYNAME function
|
|
MYNAME function del (re-active)
|
|
"""
|
|
(function, action) = normalize_argv(arg, 2)
|
|
if function is None:
|
|
self._missing_argument()
|
|
|
|
if to_int(function):
|
|
function = "0x%x" % function
|
|
|
|
bnum = "$deactive_%s_bnum" % function
|
|
if action and "del" in action:
|
|
peda.execute("delete %s" % bnum)
|
|
peda.execute("set %s = \"void\"" % bnum)
|
|
msg("'%s' re-activated" % function)
|
|
return
|
|
|
|
if "void" not in peda.execute_redirect("p %s" % bnum):
|
|
out = peda.execute_redirect("info breakpoints %s" % bnum)
|
|
if out:
|
|
msg("Already deactivated '%s'" % function)
|
|
msg(out)
|
|
return
|
|
else:
|
|
peda.execute("set %s = \"void\"" % bnum)
|
|
|
|
(arch, bits) = peda.getarch()
|
|
if not function.startswith("0x"): # named function
|
|
symbol = peda.elfsymbol(function)
|
|
if not symbol:
|
|
warning_msg("cannot retrieve info of function '%s'" % function)
|
|
return
|
|
peda.execute("break *0x%x" % symbol[function + "@plt"])
|
|
|
|
else: # addressed function
|
|
peda.execute("break *%s" % function)
|
|
|
|
peda.execute("set %s = $bpnum" % bnum)
|
|
tmpfd = tmpfile()
|
|
if "i386" in arch:
|
|
tmpfd.write("\n".join([
|
|
"commands $bpnum",
|
|
"silent",
|
|
"set $eax = 0",
|
|
"return",
|
|
"continue",
|
|
"end"]))
|
|
if "64" in arch:
|
|
tmpfd.write("\n".join([
|
|
"commands $bpnum",
|
|
"silent",
|
|
"set $rax = 0",
|
|
"return",
|
|
"continue",
|
|
"end"]))
|
|
tmpfd.flush()
|
|
peda.execute("source %s" % tmpfd.name)
|
|
tmpfd.close()
|
|
out = peda.execute_redirect("info breakpoints %s" % bnum)
|
|
if out:
|
|
msg("'%s' deactivated" % function)
|
|
msg(out)
|
|
return
|
|
|
|
def unptrace(self, *arg):
|
|
"""
|
|
Disable anti-ptrace detection
|
|
Usage:
|
|
MYNAME
|
|
MYNAME del
|
|
"""
|
|
(action,) = normalize_argv(arg, 1)
|
|
|
|
self.deactive("ptrace", action)
|
|
|
|
if not action and "void" in peda.execute_redirect("p $deactive_ptrace_bnum"):
|
|
# cannot deactive vi plt entry, try syscall method
|
|
msg("Try to patch 'ptrace' via syscall")
|
|
peda.execute("catch syscall ptrace")
|
|
peda.execute("set $deactive_ptrace_bnum = $bpnum")
|
|
tmpfd = tmpfile()
|
|
(arch, bits) = peda.getarch()
|
|
if "i386" in arch:
|
|
tmpfd.write("\n".join([
|
|
"commands $bpnum",
|
|
"silent",
|
|
"if (*(int*)($esp+4) == 0 || $ebx == 0)",
|
|
" set $eax = 0",
|
|
"end",
|
|
"continue",
|
|
"end"]))
|
|
if "64" in arch:
|
|
tmpfd.write("\n".join([
|
|
"commands $bpnum",
|
|
"silent",
|
|
"if ($rdi == 0)",
|
|
" set $rax = 0",
|
|
"end",
|
|
"continue",
|
|
"end"]))
|
|
tmpfd.flush()
|
|
peda.execute("source %s" % tmpfd.name)
|
|
tmpfd.close()
|
|
out = peda.execute_redirect("info breakpoints $deactive_ptrace_bnum")
|
|
if out:
|
|
msg("'ptrace' deactivated")
|
|
msg(out)
|
|
return
|
|
|
|
# get_function_args()
|
|
def dumpargs(self, *arg):
|
|
"""
|
|
Display arguments passed to a function when stopped at a call instruction
|
|
Usage:
|
|
MYNAME [count]
|
|
count: force to display "count args" instead of guessing
|
|
"""
|
|
|
|
(count,) = normalize_argv(arg, 1)
|
|
if not self._is_running():
|
|
return
|
|
|
|
args = peda.get_function_args(count)
|
|
if args:
|
|
msg("Guessed arguments:")
|
|
for (i, a) in enumerate(args):
|
|
chain = peda.examine_mem_reference(a)
|
|
msg("arg[%d]: %s" % (i, format_reference_chain(chain)))
|
|
else:
|
|
msg("No argument")
|
|
|
|
return
|
|
|
|
def xuntil(self, *arg):
|
|
"""
|
|
Continue execution until an address or function
|
|
Usage:
|
|
MYNAME address | function
|
|
"""
|
|
(address,) = normalize_argv(arg, 1)
|
|
if to_int(address) is None:
|
|
peda.execute("tbreak %s" % address)
|
|
else:
|
|
peda.execute("tbreak *0x%x" % address)
|
|
pc = peda.getreg("pc")
|
|
if pc is None:
|
|
peda.execute("run")
|
|
else:
|
|
peda.execute("continue")
|
|
return
|
|
|
|
def goto(self, *arg):
|
|
"""
|
|
Continue execution at an address
|
|
Usage:
|
|
MYNAME address
|
|
"""
|
|
(address,) = normalize_argv(arg, 1)
|
|
if to_int(address) is None:
|
|
self._missing_argument()
|
|
|
|
peda.execute("set $pc = 0x%x" % address)
|
|
peda.execute("stop")
|
|
return
|
|
|
|
def skipi(self, *arg):
|
|
"""
|
|
Skip execution of next count instructions
|
|
Usage:
|
|
MYNAME [count]
|
|
"""
|
|
(count,) = normalize_argv(arg, 1)
|
|
if to_int(count) is None:
|
|
count = 1
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
next_code = peda.next_inst(peda.getreg("pc"), count)
|
|
if not next_code:
|
|
warning_msg("failed to get next instructions")
|
|
return
|
|
last_addr = next_code[-1][0]
|
|
peda.execute("set $pc = 0x%x" % last_addr)
|
|
peda.execute("stop")
|
|
return
|
|
|
|
def start(self, *arg):
|
|
"""
|
|
Start debugged program and stop at most convenient entry
|
|
Usage:
|
|
MYNAME
|
|
"""
|
|
entries = ["main"]
|
|
main_addr = peda.main_entry()
|
|
if main_addr:
|
|
entries += ["*0x%x" % main_addr]
|
|
entries += ["__libc_start_main@plt"]
|
|
entries += ["_start"]
|
|
entries += ["_init"]
|
|
|
|
started = 0
|
|
for e in entries:
|
|
out = peda.execute_redirect("tbreak %s" % e)
|
|
if out and "breakpoint" in out:
|
|
peda.execute("run %s" % ' '.join(arg))
|
|
started = 1
|
|
break
|
|
|
|
if not started: # try ELF entry point or just "run" as the last resort
|
|
elf_entry = peda.elfentry()
|
|
if elf_entry:
|
|
out = peda.execute_redirect("tbreak *%s" % elf_entry)
|
|
|
|
peda.execute("run")
|
|
|
|
return
|
|
|
|
# stepuntil()
|
|
def stepuntil(self, *arg):
|
|
"""
|
|
Step until a desired instruction in specific memory range
|
|
Usage:
|
|
MYNAME "inst1,inst2" (step to next inst in binary)
|
|
MYNAME "inst1,inst2" mapname1,mapname2
|
|
"""
|
|
(insts, mapname) = normalize_argv(arg, 2)
|
|
if insts is None:
|
|
self._missing_argument()
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
peda.save_user_command("hook-stop") # disable hook-stop to speedup
|
|
msg("Stepping through, Ctrl-C to stop...")
|
|
result = peda.stepuntil(insts, mapname)
|
|
peda.restore_user_command("hook-stop")
|
|
|
|
if result:
|
|
peda.execute("stop")
|
|
return
|
|
|
|
# wrapper for stepuntil("call")
|
|
def nextcall(self, *arg):
|
|
"""
|
|
Step until next 'call' instruction in specific memory range
|
|
Usage:
|
|
MYNAME [keyword] [mapname1,mapname2]
|
|
"""
|
|
(keyword, mapname) = normalize_argv(arg, 2)
|
|
|
|
if keyword:
|
|
self.stepuntil("call.*%s" % keyword, mapname)
|
|
else:
|
|
self.stepuntil("call", mapname)
|
|
return
|
|
|
|
# wrapper for stepuntil("j")
|
|
def nextjmp(self, *arg):
|
|
"""
|
|
Step until next 'j*' instruction in specific memory range
|
|
Usage:
|
|
MYNAME [keyword] [mapname1,mapname2]
|
|
"""
|
|
(keyword, mapname) = normalize_argv(arg, 2)
|
|
|
|
if keyword:
|
|
self.stepuntil("j.*%s" % keyword, mapname)
|
|
else:
|
|
self.stepuntil("j", mapname)
|
|
return
|
|
|
|
#stepuntil()
|
|
def tracecall(self, *arg):
|
|
"""
|
|
Trace function calls made by the program
|
|
Usage:
|
|
MYNAME ["func1,func2"] [mapname1,mapname2]
|
|
MYNAME ["-func1,func2"] [mapname1,mapname2] (inverse)
|
|
default is to trace internal calls made by the program
|
|
"""
|
|
(funcs, mapname) = normalize_argv(arg, 2)
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
if not mapname:
|
|
mapname = "binary"
|
|
|
|
fnames = [""]
|
|
if funcs:
|
|
if to_int(funcs):
|
|
funcs = "0x%x" % funcs
|
|
fnames = funcs.replace(",", " ").split()
|
|
for (idx, fn) in enumerate(fnames):
|
|
if to_int(fn):
|
|
fnames[idx] = "0x%x" % to_int(fn)
|
|
|
|
inverse = 0
|
|
for (idx, fn) in enumerate(fnames):
|
|
if fn.startswith("-"): # inverse trace
|
|
fnames[idx] = fn[1:]
|
|
inverse = 1
|
|
|
|
binname = peda.getfile()
|
|
logname = peda.get_config_filename("tracelog")
|
|
|
|
if mapname is None:
|
|
mapname = binname
|
|
|
|
peda.save_user_command("hook-stop") # disable hook-stop to speedup
|
|
msg("Tracing calls %s '%s', Ctrl-C to stop..." % ("match" if not inverse else "not match", ",".join(fnames)))
|
|
prev_depth = peda.backtrace_depth(peda.getreg("sp"))
|
|
|
|
logfd = open(logname, "w")
|
|
while True:
|
|
result = peda.stepuntil("call", mapname, prev_depth)
|
|
if result is None:
|
|
break
|
|
(call_depth, code) = result
|
|
prev_depth += call_depth
|
|
if not code.startswith("=>"):
|
|
break
|
|
|
|
if not inverse:
|
|
matched = False
|
|
for fn in fnames:
|
|
fn = fn.strip()
|
|
if re.search(fn, code.split(":\t")[-1]):
|
|
matched = True
|
|
break
|
|
else:
|
|
matched = True
|
|
for fn in fnames:
|
|
fn = fn.strip()
|
|
if re.search(fn, code.split(":\t")[-1]):
|
|
matched = False
|
|
break
|
|
|
|
if matched:
|
|
code = format_disasm_code(code)
|
|
msg("%s%s%s" % (" "*(prev_depth-1), " dep:%02d " % (prev_depth-1), colorize(code.strip())), teefd=logfd)
|
|
args = peda.get_function_args()
|
|
if args:
|
|
for (i, a) in enumerate(args):
|
|
chain = peda.examine_mem_reference(a)
|
|
text = "%s |-- arg[%d]: %s" % (" "*(prev_depth-1), i, format_reference_chain(chain))
|
|
msg(text, teefd=logfd)
|
|
|
|
msg(code, "red")
|
|
peda.restore_user_command("hook-stop")
|
|
if "STOP" not in peda.get_status():
|
|
peda.execute("stop")
|
|
logfd.close()
|
|
msg("Saved trace information in file %s, view with 'less -r file'" % logname)
|
|
return
|
|
|
|
# stepuntil()
|
|
def traceinst(self, *arg):
|
|
"""
|
|
Trace specific instructions executed by the program
|
|
Usage:
|
|
MYNAME ["inst1,inst2"] [mapname1,mapname2]
|
|
MYNAME count (trace execution of next count instrcutions)
|
|
default is to trace instructions inside the program
|
|
"""
|
|
(insts, mapname) = normalize_argv(arg, 2)
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
if not mapname:
|
|
mapname = "binary"
|
|
|
|
instlist = [".*"]
|
|
count = -1
|
|
if insts:
|
|
if to_int(insts):
|
|
count = insts
|
|
else:
|
|
instlist = insts.replace(",", " ").split()
|
|
|
|
binname = peda.getfile()
|
|
logname = peda.get_config_filename("tracelog")
|
|
|
|
if mapname is None:
|
|
mapname = binname
|
|
|
|
peda.save_user_command("hook-stop") # disable hook-stop to speedup
|
|
msg("Tracing instructions match '%s', Ctrl-C to stop..." % (",".join(instlist)))
|
|
prev_depth = peda.backtrace_depth(peda.getreg("sp"))
|
|
logfd = open(logname, "w")
|
|
|
|
p = re.compile(".*?:\s*[^ ]*\s*([^,]*),(.*)")
|
|
while count:
|
|
result = peda.stepuntil(",".join(instlist), mapname, prev_depth)
|
|
if result is None:
|
|
break
|
|
(call_depth, code) = result
|
|
prev_depth += call_depth
|
|
if not code.startswith("=>"):
|
|
break
|
|
|
|
# special case for JUMP inst
|
|
prev_code = ""
|
|
if re.search("j[^m]", code.split(":\t")[-1].split()[0]):
|
|
prev_insts = peda.prev_inst(peda.getreg("pc"))
|
|
if prev_insts:
|
|
prev_code = "0x%x:%s" % prev_insts[0]
|
|
msg("%s%s%s" % (" "*(prev_depth-1), " dep:%02d " % (prev_depth-1), prev_code), teefd=logfd)
|
|
|
|
text = "%s%s%s" % (" "*(prev_depth-1), " dep:%02d " % (prev_depth-1), code.strip())
|
|
msg(text, teefd=logfd)
|
|
|
|
if re.search("call", code.split(":\t")[-1].split()[0]):
|
|
args = peda.get_function_args()
|
|
if args:
|
|
for (i, a) in enumerate(args):
|
|
chain = peda.examine_mem_reference(a)
|
|
text = "%s |-- arg[%d]: %s" % (" "*(prev_depth-1), i, format_reference_chain(chain))
|
|
msg(text, teefd=logfd)
|
|
|
|
# get registers info if any
|
|
(arch, bits) = peda.getarch()
|
|
code = code.split("#")[0].strip("=>")
|
|
if prev_code:
|
|
m = p.search(prev_code)
|
|
else:
|
|
m = p.search(code)
|
|
|
|
if m:
|
|
for op in m.groups():
|
|
if op.startswith("0x"): continue
|
|
v = to_int(peda.parse_and_eval(op))
|
|
chain = peda.examine_mem_reference(v)
|
|
text = "%s |-- %03s: %s" % (" "*(prev_depth-1), op, format_reference_chain(chain))
|
|
msg(text, teefd=logfd)
|
|
|
|
count -= 1
|
|
|
|
msg(code, "red")
|
|
peda.restore_user_command("hook-stop")
|
|
logfd.close()
|
|
msg("Saved trace information in file %s, view with 'less -r file'" % logname)
|
|
return
|
|
|
|
def profile(self, *arg):
|
|
"""
|
|
Simple profiling to count executed instructions in the program
|
|
Usage:
|
|
MYNAME count [keyword]
|
|
default is to count instructions inside the program only
|
|
count = 0: run until end of execution
|
|
keyword: only display stats for instructions matched it
|
|
"""
|
|
(count, keyword) = normalize_argv(arg, 2)
|
|
|
|
if count is None:
|
|
self._missing_argument()
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
if keyword is None or keyword == "all":
|
|
keyword = ""
|
|
|
|
keyword = keyword.replace(" ", "").split(",")
|
|
|
|
peda.save_user_command("hook-stop") # disable hook-stop to speedup
|
|
msg("Stepping %s instructions, Ctrl-C to stop..." % ("%d" % count if count else "all"))
|
|
|
|
if count == 0:
|
|
count = -1
|
|
stats = {}
|
|
total = 0
|
|
binmap = peda.get_vmmap("binary")
|
|
try:
|
|
while count != 0:
|
|
pc = peda.getreg("pc")
|
|
if not peda.is_address(pc):
|
|
break
|
|
code = peda.get_disasm(pc)
|
|
if not code:
|
|
break
|
|
if peda.is_address(pc, binmap):
|
|
for k in keyword:
|
|
if k in code.split(":\t")[-1]:
|
|
code = code.strip("=>").strip()
|
|
stats.setdefault(code, 0)
|
|
stats[code] += 1
|
|
break
|
|
peda.execute_redirect("stepi", silent=True)
|
|
else:
|
|
peda.execute_redirect("stepi", silent=True)
|
|
peda.execute_redirect("finish", silent=True)
|
|
count -= 1
|
|
total += 1
|
|
except:
|
|
pass
|
|
|
|
peda.restore_user_command("hook-stop")
|
|
text = "Executed %d instructions\n" % total
|
|
text += "%s %s\n" % (blue("Run-count", "bold"), blue("Instruction", "bold"))
|
|
for (code, count) in sorted(stats.items(), key = lambda x: x[1], reverse=True):
|
|
text += "%8d: %s\n" % (count, code)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
@msg.bufferize
|
|
def context_register(self, *arg):
|
|
"""
|
|
Display register information of current execution context
|
|
Usage:
|
|
MYNAME
|
|
"""
|
|
if not self._is_running():
|
|
return
|
|
|
|
pc = peda.getreg("pc")
|
|
# display register info
|
|
msg("[%s]" % "registers".center(78, "-"), "blue")
|
|
self.xinfo("register")
|
|
|
|
return
|
|
|
|
@msg.bufferize
|
|
def context_code(self, *arg):
|
|
"""
|
|
Display nearby disassembly at $PC of current execution context
|
|
Usage:
|
|
MYNAME [linecount]
|
|
"""
|
|
(count,) = normalize_argv(arg, 1)
|
|
|
|
if count is None:
|
|
count = 8
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
pc = peda.getreg("pc")
|
|
if peda.is_address(pc):
|
|
inst = peda.get_disasm(pc)
|
|
else:
|
|
inst = None
|
|
|
|
text = blue("[%s]" % "code".center(78, "-"))
|
|
msg(text)
|
|
if inst: # valid $PC
|
|
text = ""
|
|
opcode = inst.split(":\t")[-1].split()[0]
|
|
# stopped at function call
|
|
if "call" in opcode:
|
|
text += peda.disassemble_around(pc, count)
|
|
msg(format_disasm_code(text, pc))
|
|
self.dumpargs()
|
|
# stopped at jump
|
|
elif "j" in opcode:
|
|
jumpto = peda.testjump(inst)
|
|
if jumpto: # JUMP is taken
|
|
code = peda.disassemble_around(pc, count)
|
|
code = code.splitlines()
|
|
pc_idx = 999
|
|
for (idx, line) in enumerate(code):
|
|
if ("0x%x" % pc) in line.split(":")[0]:
|
|
pc_idx = idx
|
|
if idx <= pc_idx:
|
|
text += line + "\n"
|
|
else:
|
|
text += " | %s\n" % line.strip()
|
|
text = format_disasm_code(text, pc) + "\n"
|
|
text += " |->"
|
|
code = peda.get_disasm(jumpto, count//2)
|
|
if not code:
|
|
code = " Cannot evaluate jump destination\n"
|
|
|
|
code = code.splitlines()
|
|
text += red(code[0]) + "\n"
|
|
for line in code[1:]:
|
|
text += " %s\n" % line.strip()
|
|
text += red("JUMP is taken".rjust(79))
|
|
else: # JUMP is NOT taken
|
|
text += format_disasm_code(peda.disassemble_around(pc, count), pc)
|
|
text += "\n" + green("JUMP is NOT taken".rjust(79))
|
|
|
|
msg(text.rstrip())
|
|
# stopped at other instructions
|
|
else:
|
|
text += peda.disassemble_around(pc, count)
|
|
msg(format_disasm_code(text, pc))
|
|
else: # invalid $PC
|
|
msg("Invalid $PC address: 0x%x" % pc, "red")
|
|
|
|
return
|
|
|
|
@msg.bufferize
|
|
def context_stack(self, *arg):
|
|
"""
|
|
Display stack of current execution context
|
|
Usage:
|
|
MYNAME [linecount]
|
|
"""
|
|
(count,) = normalize_argv(arg, 1)
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
text = blue("[%s]" % "stack".center(78, "-"))
|
|
msg(text)
|
|
sp = peda.getreg("sp")
|
|
if peda.is_address(sp):
|
|
self.telescope(sp, count)
|
|
else:
|
|
msg("Invalid $SP address: 0x%x" % sp, "red")
|
|
|
|
return
|
|
|
|
def context(self, *arg):
|
|
"""
|
|
Display various information of current execution context
|
|
Usage:
|
|
MYNAME [reg,code,stack,all] [code/stack length]
|
|
"""
|
|
|
|
(opt, count) = normalize_argv(arg, 2)
|
|
|
|
if to_int(count) is None:
|
|
count = 8
|
|
if opt is None:
|
|
opt = config.Option.get("context")
|
|
if opt == "all":
|
|
opt = "register,code,stack"
|
|
|
|
opt = opt.replace(" ", "").split(",")
|
|
|
|
if not opt:
|
|
return
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
clearscr = config.Option.get("clearscr")
|
|
if clearscr == "on":
|
|
clearscreen()
|
|
|
|
status = peda.get_status()
|
|
# display registers
|
|
if "reg" in opt or "register" in opt:
|
|
self.context_register()
|
|
|
|
# display assembly code
|
|
if "code" in opt:
|
|
self.context_code(count)
|
|
|
|
# display stack content, forced in case SIGSEGV
|
|
if "stack" in opt or "SIGSEGV" in status:
|
|
self.context_stack(count)
|
|
msg("[%s]" % ("-"*78), "blue")
|
|
msg("Legend: %s, %s, %s, value" % (red("code"), blue("data"), green("rodata")))
|
|
|
|
# display stopped reason
|
|
if "SIG" in status:
|
|
msg("Stopped reason: %s" % red(status))
|
|
|
|
return
|
|
|
|
def breakrva(self, *arg):
|
|
"""
|
|
Set breakpoint by Relative Virtual Address (RVA)
|
|
Usage:
|
|
MYNAME rva
|
|
MYNAME rva module_name (e.g binary, shared module name)
|
|
"""
|
|
(rva, module) = normalize_argv(arg, 2)
|
|
if rva is None or not to_int(rva):
|
|
self._missing_argument()
|
|
if module is None:
|
|
module = 'binary'
|
|
|
|
binmap = peda.get_vmmap(module)
|
|
if len(binmap) == 0:
|
|
msg("No module matches '%s'" % module)
|
|
else:
|
|
base_address = binmap[0][0]
|
|
peda.set_breakpoint(base_address+rva)
|
|
return
|
|
|
|
#################################
|
|
# Memory Operation Commands #
|
|
#################################
|
|
# get_vmmap()
|
|
def vmmap(self, *arg):
|
|
"""
|
|
Get virtual mapping address ranges of section(s) in debugged process
|
|
Usage:
|
|
MYNAME [mapname] (e.g binary, all, libc, stack)
|
|
MYNAME address (find mapname contains this address)
|
|
MYNAME (equiv to cat /proc/pid/maps)
|
|
"""
|
|
|
|
(mapname,) = normalize_argv(arg, 1)
|
|
if not self._is_running():
|
|
maps = peda.get_vmmap()
|
|
elif to_int(mapname) is None:
|
|
maps = peda.get_vmmap(mapname)
|
|
else:
|
|
addr = to_int(mapname)
|
|
maps = []
|
|
allmaps = peda.get_vmmap()
|
|
if allmaps is not None:
|
|
for (start, end, perm, name) in allmaps:
|
|
if addr >= start and addr < end:
|
|
maps += [(start, end, perm, name)]
|
|
|
|
if maps is not None and len(maps) > 0:
|
|
l = 10 if peda.intsize() == 4 else 18
|
|
msg("%s %s %s\t%s" % ("Start".ljust(l, " "), "End".ljust(l, " "), "Perm", "Name"), "blue", "bold")
|
|
for (start, end, perm, name) in maps:
|
|
color = "red" if "rwx" in perm else None
|
|
msg("%s %s %s\t%s" % (to_address(start).ljust(l, " "), to_address(end).ljust(l, " "), perm, name), color)
|
|
else:
|
|
warning_msg("not found or cannot access procfs")
|
|
return
|
|
|
|
# writemem()
|
|
def patch(self, *arg):
|
|
"""
|
|
Patch memory start at an address with string/hexstring/int
|
|
Usage:
|
|
MYNAME address (multiple lines input)
|
|
MYNAME address "string"
|
|
MYNAME from_address to_address "string"
|
|
MYNAME (will patch at current $pc)
|
|
"""
|
|
|
|
(address, data, byte) = normalize_argv(arg, 3)
|
|
address = to_int(address)
|
|
end_address = None
|
|
if address is None:
|
|
address = peda.getreg("pc")
|
|
|
|
if byte is not None and to_int(data) is not None:
|
|
end_address, data = to_int(data), byte
|
|
if end_address < address:
|
|
address, end_address = end_address, address
|
|
|
|
if data is None:
|
|
data = ""
|
|
while True:
|
|
line = input("patch> ")
|
|
if line.strip() == "": continue
|
|
if line == "end":
|
|
break
|
|
user_input = line.strip()
|
|
if user_input.startswith("0x"):
|
|
data += hex2str(user_input)
|
|
else:
|
|
data += eval("%s" % user_input)
|
|
|
|
if to_int(data) is not None:
|
|
data = hex2str(to_int(data), peda.intsize())
|
|
|
|
data = to_binary_string(data)
|
|
data = data.replace(b"\\\\", b"\\")
|
|
if end_address:
|
|
data *= (end_address-address + 1) // len(data)
|
|
bytes_ = peda.writemem(address, data)
|
|
if bytes_ >= 0:
|
|
msg("Written %d bytes to 0x%x" % (bytes_, address))
|
|
else:
|
|
warning_msg("Failed to patch memory, try 'set write on' first for offline patching")
|
|
return
|
|
|
|
# dumpmem()
|
|
def dumpmem(self, *arg):
|
|
"""
|
|
Dump content of a memory region to raw binary file
|
|
Usage:
|
|
MYNAME file start end
|
|
MYNAME file mapname
|
|
"""
|
|
(filename, start, end) = normalize_argv(arg, 3)
|
|
if end is not None and to_int(end):
|
|
if end < start:
|
|
start, end = end, start
|
|
ret = peda.execute("dump memory %s 0x%x 0x%x" % (filename, start, end))
|
|
if not ret:
|
|
warning_msg("failed to dump memory")
|
|
else:
|
|
msg("Dumped %d bytes to '%s'" % (end-start, filename))
|
|
elif start is not None: # dump by mapname
|
|
maps = peda.get_vmmap(start)
|
|
if maps:
|
|
fd = open(filename, "wb")
|
|
count = 0
|
|
for (start, end, _, _) in maps:
|
|
mem = peda.dumpmem(start, end)
|
|
if mem is None: # nullify unreadable memory
|
|
mem = "\x00"*(end-start)
|
|
fd.write(mem)
|
|
count += end - start
|
|
fd.close()
|
|
msg("Dumped %d bytes to '%s'" % (count, filename))
|
|
else:
|
|
warning_msg("invalid mapname")
|
|
else:
|
|
self._missing_argument()
|
|
|
|
return
|
|
|
|
# loadmem()
|
|
def loadmem(self, *arg):
|
|
"""
|
|
Load contents of a raw binary file to memory
|
|
Usage:
|
|
MYNAME file address [size]
|
|
"""
|
|
mem = ""
|
|
(filename, address, size) = normalize_argv(arg, 3)
|
|
address = to_int(address)
|
|
size = to_int(size)
|
|
if filename is not None:
|
|
try:
|
|
mem = open(filename, "rb").read()
|
|
except:
|
|
pass
|
|
if mem == "":
|
|
error_msg("cannot read data or filename is empty")
|
|
return
|
|
if size is not None and size < len(mem):
|
|
mem = mem[:size]
|
|
bytes = peda.writemem(address, mem)
|
|
if bytes > 0:
|
|
msg("Written %d bytes to 0x%x" % (bytes, address))
|
|
else:
|
|
warning_msg("failed to load filename to memory")
|
|
else:
|
|
self._missing_argument()
|
|
return
|
|
|
|
# cmpmem()
|
|
def cmpmem(self, *arg):
|
|
"""
|
|
Compare content of a memory region with a file
|
|
Usage:
|
|
MYNAME start end file
|
|
"""
|
|
(start, end, filename) = normalize_argv(arg, 3)
|
|
if filename is None:
|
|
self._missing_argument()
|
|
|
|
try:
|
|
buf = open(filename, "rb").read()
|
|
except:
|
|
error_msg("cannot read data from filename %s" % filename)
|
|
return
|
|
|
|
result = peda.cmpmem(start, end, buf)
|
|
|
|
if result is None:
|
|
warning_msg("failed to perform comparison")
|
|
elif result == {}:
|
|
msg("mem and filename are identical")
|
|
else:
|
|
msg("--- mem: %s -> %s" % (arg[0], arg[1]), "green", "bold")
|
|
msg("+++ filename: %s" % arg[2], "blue", "bold")
|
|
for (addr, bytes_) in result.items():
|
|
msg("@@ 0x%x @@" % addr, "red")
|
|
line_1 = "- "
|
|
line_2 = "+ "
|
|
for (mem_val, file_val) in bytes_:
|
|
m_byte = "%02X " % ord(mem_val)
|
|
f_byte = "%02X " % ord(file_val)
|
|
if mem_val == file_val:
|
|
line_1 += m_byte
|
|
line_2 += f_byte
|
|
else:
|
|
line_1 += green(m_byte)
|
|
line_2 += blue(f_byte)
|
|
msg(line_1)
|
|
msg(line_2)
|
|
return
|
|
|
|
# xormem()
|
|
def xormem(self, *arg):
|
|
"""
|
|
XOR a memory region with a key
|
|
Usage:
|
|
MYNAME start end key
|
|
"""
|
|
(start, end, key) = normalize_argv(arg, 3)
|
|
if key is None:
|
|
self._missing_argument()
|
|
|
|
result = peda.xormem(start, end, key)
|
|
if result is not None:
|
|
msg("XORed data (first 32 bytes):")
|
|
msg('"' + to_hexstr(result[:32]) + '"')
|
|
return
|
|
|
|
# searchmem(), searchmem_by_range()
|
|
def searchmem(self, *arg):
|
|
"""
|
|
Search for a pattern in memory; support regex search
|
|
Usage:
|
|
MYNAME pattern start end
|
|
MYNAME pattern mapname
|
|
"""
|
|
(pattern, start, end) = normalize_argv(arg, 3)
|
|
(pattern, mapname) = normalize_argv(arg, 2)
|
|
if pattern is None:
|
|
self._missing_argument()
|
|
|
|
pattern = arg[0]
|
|
result = []
|
|
if end is None and to_int(mapname):
|
|
vmrange = peda.get_vmrange(mapname)
|
|
if vmrange:
|
|
(start, end, _, _) = vmrange
|
|
|
|
if end is None:
|
|
msg("Searching for %s in: %s ranges" % (repr(pattern), mapname))
|
|
result = peda.searchmem_by_range(mapname, pattern)
|
|
else:
|
|
msg("Searching for %s in range: 0x%x - 0x%x" % (repr(pattern), start, end))
|
|
result = peda.searchmem(start, end, pattern)
|
|
|
|
text = peda.format_search_result(result)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# search_reference()
|
|
def refsearch(self, *arg):
|
|
"""
|
|
Search for all references to a value in memory ranges
|
|
Usage:
|
|
MYNAME value mapname
|
|
MYNAME value (search in all memory ranges)
|
|
"""
|
|
(search, mapname) = normalize_argv(arg, 2)
|
|
if search is None:
|
|
self._missing_argument()
|
|
|
|
search = arg[0]
|
|
if mapname is None:
|
|
mapname = "all"
|
|
msg("Searching for reference to: %s in: %s ranges" % (repr(search), mapname))
|
|
result = peda.search_reference(search, mapname)
|
|
|
|
text = peda.format_search_result(result)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# search_address(), search_pointer()
|
|
def lookup(self, *arg):
|
|
"""
|
|
Search for all addresses/references to addresses which belong to a memory range
|
|
Usage:
|
|
MYNAME address searchfor belongto
|
|
MYNAME pointer searchfor belongto
|
|
"""
|
|
(option, searchfor, belongto) = normalize_argv(arg, 3)
|
|
if option is None:
|
|
self._missing_argument()
|
|
|
|
result = []
|
|
if searchfor is None:
|
|
searchfor = "stack"
|
|
if belongto is None:
|
|
belongto = "binary"
|
|
|
|
if option == "pointer":
|
|
msg("Searching for pointers on: %s pointed to: %s, this may take minutes to complete..." % (searchfor, belongto))
|
|
result = peda.search_pointer(searchfor, belongto)
|
|
if option == "address":
|
|
msg("Searching for addresses on: %s belong to: %s, this may take minutes to complete..." % (searchfor, belongto))
|
|
result = peda.search_address(searchfor, belongto)
|
|
|
|
text = peda.format_search_result(result, 0)
|
|
pager(text)
|
|
|
|
return
|
|
lookup.options = ["address", "pointer"]
|
|
|
|
# examine_mem_reference()
|
|
def telescope(self, *arg):
|
|
"""
|
|
Display memory content at an address with smart dereferences
|
|
Usage:
|
|
MYNAME [linecount] (analyze at current $SP)
|
|
MYNAME address [linecount]
|
|
"""
|
|
|
|
(address, count) = normalize_argv(arg, 2)
|
|
|
|
if self._is_running():
|
|
sp = peda.getreg("sp")
|
|
else:
|
|
sp = None
|
|
|
|
if count is None:
|
|
count = 8
|
|
if address is None:
|
|
address = sp
|
|
elif address < 0x1000:
|
|
count = address
|
|
address = sp
|
|
|
|
if not address:
|
|
return
|
|
|
|
step = peda.intsize()
|
|
if not peda.is_address(address): # cannot determine address
|
|
msg("Invalid $SP address: 0x%x" % address, "red")
|
|
return
|
|
for i in range(count):
|
|
if not peda.execute("x/%sx 0x%x" % ("g" if step == 8 else "w", address + i*step)):
|
|
break
|
|
return
|
|
|
|
result = []
|
|
for i in range(count):
|
|
value = address + i*step
|
|
if peda.is_address(value):
|
|
result += [peda.examine_mem_reference(value)]
|
|
else:
|
|
result += [None]
|
|
idx = 0
|
|
text = ""
|
|
for chain in result:
|
|
text += "%04d| " % (idx)
|
|
text += format_reference_chain(chain)
|
|
text += "\n"
|
|
idx += step
|
|
|
|
pager(text)
|
|
|
|
return
|
|
|
|
def eflags(self, *arg):
|
|
"""
|
|
Display/set/clear/toggle value of eflags register
|
|
Usage:
|
|
MYNAME
|
|
MYNAME [set|clear|toggle] flagname
|
|
"""
|
|
FLAGS = ["CF", "PF", "AF", "ZF", "SF", "TF", "IF", "DF", "OF"]
|
|
FLAGS_TEXT = ["Carry", "Parity", "Adjust", "Zero", "Sign", "Trap",
|
|
"Interrupt", "Direction", "Overflow"]
|
|
|
|
(option, flagname) = normalize_argv(arg, 2)
|
|
if not self._is_running():
|
|
return
|
|
|
|
elif option and not flagname:
|
|
self._missing_argument()
|
|
|
|
elif option is None: # display eflags
|
|
flags = peda.get_eflags()
|
|
text = ""
|
|
for (i, f) in enumerate(FLAGS):
|
|
if flags[f]:
|
|
text += "%s " % red(FLAGS_TEXT[i].upper(), "bold")
|
|
else:
|
|
text += "%s " % green(FLAGS_TEXT[i].lower())
|
|
|
|
eflags = peda.getreg("eflags")
|
|
msg("%s: 0x%x (%s)" % (green("EFLAGS"), eflags, text.strip()))
|
|
|
|
elif option == "set":
|
|
peda.set_eflags(flagname, True)
|
|
|
|
elif option == "clear":
|
|
peda.set_eflags(flagname, False)
|
|
|
|
elif option == "toggle":
|
|
peda.set_eflags(flagname, None)
|
|
|
|
return
|
|
eflags.options = ["set", "clear", "toggle"]
|
|
|
|
def xinfo(self, *arg):
|
|
"""
|
|
Display detail information of address/registers
|
|
Usage:
|
|
MYNAME address
|
|
MYNAME register [reg1 reg2]
|
|
"""
|
|
|
|
(address, regname) = normalize_argv(arg, 2)
|
|
if address is None:
|
|
self._missing_argument()
|
|
|
|
text = ""
|
|
if not self._is_running():
|
|
return
|
|
|
|
def get_reg_text(r, v):
|
|
text = green("%s" % r.upper().ljust(3)) + ": "
|
|
chain = peda.examine_mem_reference(v)
|
|
text += format_reference_chain(chain)
|
|
text += "\n"
|
|
return text
|
|
|
|
(arch, bits) = peda.getarch()
|
|
if str(address).startswith("r"):
|
|
# Register
|
|
regs = peda.getregs(" ".join(arg[1:]))
|
|
if regname is None:
|
|
for r in REGISTERS[bits]:
|
|
if r in regs:
|
|
text += get_reg_text(r, regs[r])
|
|
else:
|
|
for (r, v) in sorted(regs.items()):
|
|
text += get_reg_text(r, v)
|
|
if text:
|
|
msg(text.strip())
|
|
if regname is None or "eflags" in regname:
|
|
self.eflags()
|
|
return
|
|
|
|
elif to_int(address) is None:
|
|
warning_msg("not a register nor an address")
|
|
else:
|
|
# Address
|
|
chain = peda.examine_mem_reference(address, depth=0)
|
|
text += format_reference_chain(chain) + "\n"
|
|
vmrange = peda.get_vmrange(address)
|
|
if vmrange:
|
|
(start, end, perm, name) = vmrange
|
|
text += "Virtual memory mapping:\n"
|
|
text += green("Start : %s\n" % to_address(start))
|
|
text += green("End : %s\n" % to_address(end))
|
|
text += yellow("Offset: 0x%x\n" % (address-start))
|
|
text += red("Perm : %s\n" % perm)
|
|
text += blue("Name : %s" % name)
|
|
msg(text)
|
|
|
|
return
|
|
xinfo.options = ["register"]
|
|
|
|
def strings(self, *arg):
|
|
"""
|
|
Display printable strings in memory
|
|
Usage:
|
|
MYNAME start end [minlen]
|
|
MYNAME mapname [minlen]
|
|
MYNAME (display all printable strings in binary - slow)
|
|
"""
|
|
(start, end, minlen) = normalize_argv(arg, 3)
|
|
|
|
mapname = None
|
|
if start is None:
|
|
mapname = "binary"
|
|
elif to_int(start) is None or (end < start):
|
|
(mapname, minlen) = normalize_argv(arg, 2)
|
|
|
|
if minlen is None:
|
|
minlen = 1
|
|
|
|
if mapname:
|
|
maps = peda.get_vmmap(mapname)
|
|
else:
|
|
maps = [(start, end, None, None)]
|
|
|
|
if not maps:
|
|
warning_msg("failed to get memory map for %s" % mapname)
|
|
return
|
|
|
|
text = ""
|
|
regex_pattern = "[%s]{%d,}" % (re.escape(string.printable), minlen)
|
|
p = re.compile(regex_pattern.encode('utf-8'))
|
|
for (start, end, _, _) in maps:
|
|
mem = peda.dumpmem(start, end)
|
|
if not mem: continue
|
|
found = p.finditer(mem)
|
|
if not found: continue
|
|
|
|
for m in found:
|
|
text += "0x%x: %s\n" % (start+m.start(), string_repr(mem[m.start():m.end()].strip(), show_quotes=False))
|
|
|
|
pager(text)
|
|
return
|
|
|
|
def sgrep(self, *arg):
|
|
"""
|
|
Search for full strings contain the given pattern
|
|
Usage:
|
|
MYNAME pattern start end
|
|
MYNAME pattern mapname
|
|
MYNAME pattern
|
|
"""
|
|
(pattern,) = normalize_argv(arg, 1)
|
|
|
|
if pattern is None:
|
|
self._missing_argument()
|
|
arg = list(arg[1:])
|
|
if not arg:
|
|
arg = ["binary"]
|
|
|
|
pattern = "[^\x00]*%s[^\x00]*" % pattern
|
|
self.searchmem(pattern, *arg)
|
|
|
|
return
|
|
|
|
|
|
###############################
|
|
# Exploit Helper Commands #
|
|
###############################
|
|
# elfheader()
|
|
def elfheader(self, *arg):
|
|
"""
|
|
Get headers information from debugged ELF file
|
|
Usage:
|
|
MYNAME [header_name]
|
|
"""
|
|
|
|
(name,) = normalize_argv(arg, 1)
|
|
result = peda.elfheader(name)
|
|
if len(result) == 0:
|
|
warning_msg("%s not found, did you specify the FILE to debug?" % (name if name else "headers"))
|
|
elif len(result) == 1:
|
|
(k, (start, end, type)) = list(result.items())[0]
|
|
msg("%s: 0x%x - 0x%x (%s)" % (k, start, end, type))
|
|
else:
|
|
for (k, (start, end, type)) in sorted(result.items(), key=lambda x: x[1]):
|
|
msg("%s = 0x%x" % (k, start))
|
|
return
|
|
|
|
# readelf_header(), elfheader_solib()
|
|
def readelf(self, *arg):
|
|
"""
|
|
Get headers information from an ELF file
|
|
Usage:
|
|
MYNAME mapname [header_name]
|
|
MYNAME filename [header_name]
|
|
"""
|
|
|
|
(filename, hname) = normalize_argv(arg, 2)
|
|
result = {}
|
|
maps = peda.get_vmmap()
|
|
if filename is None: # fallback to elfheader()
|
|
result = peda.elfheader()
|
|
else:
|
|
result = peda.elfheader_solib(filename, hname)
|
|
|
|
if not result:
|
|
result = peda.readelf_header(filename, hname)
|
|
if len(result) == 0:
|
|
warning_msg("%s or %s not found" % (filename, hname))
|
|
elif len(result) == 1:
|
|
(k, (start, end, type)) = list(result.items())[0]
|
|
msg("%s: 0x%x - 0x%x (%s)" % (k, start, end, type))
|
|
else:
|
|
for (k, (start, end, type)) in sorted(result.items(), key=lambda x: x[1]):
|
|
msg("%s = 0x%x" % (k, start))
|
|
return
|
|
|
|
# elfsymbol()
|
|
def elfsymbol(self, *arg):
|
|
"""
|
|
Get non-debugging symbol information from an ELF file
|
|
Usage:
|
|
MYNAME symbol_name
|
|
"""
|
|
(name,) = normalize_argv(arg, 1)
|
|
if not peda.getfile():
|
|
warning_msg("please specify a file to debug")
|
|
return
|
|
|
|
result = peda.elfsymbol(name)
|
|
if len(result) == 0:
|
|
msg("'%s': no match found" % (name if name else "plt symbols"))
|
|
else:
|
|
if ("%s@got" % name) not in result:
|
|
msg("Found %d symbols" % len(result))
|
|
else:
|
|
msg("Detail symbol info")
|
|
for (k, v) in sorted(result.items(), key=lambda x: x[1]):
|
|
msg("%s = %s" % (k, "0x%x" % v if v else repr(v)))
|
|
return
|
|
|
|
# checksec()
|
|
def checksec(self, *arg):
|
|
"""
|
|
Check for various security options of binary
|
|
For full features, use http://www.trapkit.de/tools/checksec.sh
|
|
Usage:
|
|
MYNAME [file]
|
|
"""
|
|
(filename,) = normalize_argv(arg, 1)
|
|
colorcodes = {
|
|
0: red("disabled"),
|
|
1: green("ENABLED"),
|
|
2: yellow("Partial"),
|
|
3: green("FULL"),
|
|
4: yellow("Dynamic Shared Object"),
|
|
}
|
|
|
|
result = peda.checksec(filename)
|
|
if result:
|
|
for (k, v) in sorted(result.items()):
|
|
msg("%s: %s" % (k.ljust(10), colorcodes[v]))
|
|
return
|
|
|
|
def nxtest(self, *arg):
|
|
"""
|
|
Perform real NX test to see if it is enabled/supported by OS
|
|
Usage:
|
|
MYNAME [address]
|
|
"""
|
|
(address,) = normalize_argv(arg, 1)
|
|
|
|
exec_wrapper = peda.execute_redirect("show exec-wrapper").split('"')[1]
|
|
if exec_wrapper != "":
|
|
peda.execute("unset exec-wrapper")
|
|
|
|
if not peda.getpid(): # start program if not running
|
|
peda.execute("start")
|
|
|
|
# set current PC => address, continue
|
|
pc = peda.getreg("pc")
|
|
sp = peda.getreg("sp")
|
|
if not address:
|
|
address = sp
|
|
peda.execute("set $pc = 0x%x" % address)
|
|
# set value at address => 0xcc
|
|
peda.execute("set *0x%x = 0x%x" % (address, 0xcccccccc))
|
|
peda.execute("set *0x%x = 0x%x" % (address+4, 0xcccccccc))
|
|
out = peda.execute_redirect("continue")
|
|
text = "NX test at %s: " % (to_address(address) if address != sp else "stack")
|
|
|
|
if out:
|
|
if "SIGSEGV" in out:
|
|
text += red("Non-Executable")
|
|
elif "SIGTRAP" in out:
|
|
text += green("Executable")
|
|
else:
|
|
text += "Failed to test"
|
|
|
|
msg(text)
|
|
# restore exec-wrapper
|
|
if exec_wrapper != "":
|
|
peda.execute("set exec-wrapper %s" % exec_wrapper)
|
|
|
|
return
|
|
|
|
# search_asm()
|
|
def asmsearch(self, *arg):
|
|
"""
|
|
Search for ASM instructions in memory
|
|
Usage:
|
|
MYNAME "asmcode" start end
|
|
MYNAME "asmcode" mapname
|
|
"""
|
|
(asmcode, start, end) = normalize_argv(arg, 3)
|
|
if asmcode is None:
|
|
self._missing_argument()
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
asmcode = arg[0]
|
|
result = []
|
|
if end is None:
|
|
mapname = start
|
|
if mapname is None:
|
|
mapname = "binary"
|
|
maps = peda.get_vmmap(mapname)
|
|
msg("Searching for ASM code: %s in: %s ranges" % (repr(asmcode), mapname))
|
|
for (start, end, _, _) in maps:
|
|
if not peda.is_executable(start, maps): continue # skip non-executable page
|
|
result += peda.search_asm(start, end, asmcode)
|
|
else:
|
|
msg("Searching for ASM code: %s in range: 0x%x - 0x%x" % (repr(asmcode), start, end))
|
|
result = peda.search_asm(start, end, asmcode)
|
|
|
|
text = "Not found"
|
|
if result:
|
|
text = ""
|
|
for (addr, (byte, code)) in result:
|
|
text += "%s : (%s)\t%s\n" % (to_address(addr), byte.decode('utf-8'), code)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# search_asm()
|
|
def ropsearch(self, *arg):
|
|
"""
|
|
Search for ROP gadgets in memory
|
|
Note: only for simple gadgets, for full ROP search try: http://ropshell.com
|
|
Usage:
|
|
MYNAME "gadget" start end
|
|
MYNAME "gadget" pagename
|
|
"""
|
|
|
|
(asmcode, start, end) = normalize_argv(arg, 3)
|
|
if asmcode is None:
|
|
self._missing_argument()
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
asmcode = arg[0]
|
|
result = []
|
|
if end is None:
|
|
if start is None:
|
|
mapname = "binary"
|
|
else:
|
|
mapname = start
|
|
maps = peda.get_vmmap(mapname)
|
|
msg("Searching for ROP gadget: %s in: %s ranges" % (repr(asmcode), mapname))
|
|
for (start, end, _, _) in maps:
|
|
if not peda.is_executable(start, maps): continue # skip non-executable page
|
|
result += peda.search_asm(start, end, asmcode, rop=1)
|
|
else:
|
|
msg("Searching for ROP gadget: %s in range: 0x%x - 0x%x" % (repr(asmcode), start, end))
|
|
result = peda.search_asm(start, end, asmcode, rop=1)
|
|
|
|
result = sorted(result, key=lambda x: len(x[1][0]))
|
|
text = "Not found"
|
|
if result:
|
|
text = ""
|
|
for (addr, (byte, code)) in result:
|
|
text += "%s : (%s)\t%s\n" % (to_address(addr), byte, code)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# dumprop()
|
|
def dumprop(self, *arg):
|
|
"""
|
|
Dump all ROP gadgets in specific memory range
|
|
Note: only for simple gadgets, for full ROP search try: http://ropshell.com
|
|
Warning: this can be very slow, do not run for big memory range
|
|
Usage:
|
|
MYNAME start end [keyword] [depth]
|
|
MYNAME mapname [keyword]
|
|
default gadget instruction depth is: 5
|
|
"""
|
|
|
|
(start, end, keyword, depth) = normalize_argv(arg, 4)
|
|
filename = peda.getfile()
|
|
if filename is None:
|
|
warning_msg("please specify a filename to debug")
|
|
return
|
|
|
|
filename = os.path.basename(filename)
|
|
mapname = None
|
|
if start is None:
|
|
mapname = "binary"
|
|
elif end is None:
|
|
mapname = start
|
|
elif to_int(end) is None:
|
|
mapname = start
|
|
keyword = end
|
|
|
|
if depth is None:
|
|
depth = 5
|
|
|
|
result = {}
|
|
warning_msg("this can be very slow, do not run for large memory range")
|
|
if mapname:
|
|
maps = peda.get_vmmap(mapname)
|
|
for (start, end, _, _) in maps:
|
|
if not peda.is_executable(start, maps): continue # skip non-executable page
|
|
result.update(peda.dumprop(start, end, keyword))
|
|
else:
|
|
result.update(peda.dumprop(start, end, keyword))
|
|
|
|
text = "Not found"
|
|
if len(result) > 0:
|
|
text = ""
|
|
outfile = "%s-rop.txt" % filename
|
|
fd = open(outfile, "w")
|
|
msg("Writing ROP gadgets to file: %s ..." % outfile)
|
|
for (code, addr) in sorted(result.items(), key = lambda x:len(x[0])):
|
|
text += "0x%x: %s\n" % (addr, code)
|
|
fd.write("0x%x: %s\n" % (addr, code))
|
|
fd.close()
|
|
|
|
pager(text)
|
|
return
|
|
|
|
# common_rop_gadget()
|
|
def ropgadget(self, *arg):
|
|
"""
|
|
Get common ROP gadgets of binary or library
|
|
Usage:
|
|
MYNAME [mapname]
|
|
"""
|
|
|
|
(mapname,) = normalize_argv(arg, 1)
|
|
result = peda.common_rop_gadget(mapname)
|
|
if not result:
|
|
msg("Not found")
|
|
else:
|
|
text = ""
|
|
for (k, v) in sorted(result.items(), key=lambda x: len(x[0]) if not x[0].startswith("add") else int(x[0].split("_")[1])):
|
|
text += "%s = 0x%x\n" % (k, v)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# search_jmpcall()
|
|
def jmpcall(self, *arg):
|
|
"""
|
|
Search for JMP/CALL instructions in memory
|
|
Usage:
|
|
MYNAME (search all JMP/CALL in current binary)
|
|
MYNAME reg [mapname]
|
|
MYNAME reg start end
|
|
"""
|
|
|
|
(reg, start, end) = normalize_argv(arg, 3)
|
|
result = []
|
|
if not self._is_running():
|
|
return
|
|
|
|
mapname = None
|
|
if start is None:
|
|
mapname = "binary"
|
|
elif end is None:
|
|
mapname = start
|
|
|
|
if mapname:
|
|
maps = peda.get_vmmap(mapname)
|
|
for (start, end, _, _) in maps:
|
|
if not peda.is_executable(start, maps): continue
|
|
result += peda.search_jmpcall(start, end, reg)
|
|
else:
|
|
result = peda.search_jmpcall(start, end, reg)
|
|
|
|
if not result:
|
|
msg("Not found")
|
|
else:
|
|
text = ""
|
|
for (a, v) in result:
|
|
text += "0x%x : %s\n" % (a, v)
|
|
pager(text)
|
|
|
|
return
|
|
|
|
# cyclic_pattern()
|
|
def pattern_create(self, *arg):
|
|
"""
|
|
Generate a cyclic pattern
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME size [file]
|
|
"""
|
|
|
|
(size, filename) = normalize_argv(arg, 2)
|
|
if size is None:
|
|
self._missing_argument()
|
|
|
|
pattern = cyclic_pattern(size)
|
|
if filename is not None:
|
|
open(filename, "wb").write(pattern)
|
|
msg("Writing pattern of %d chars to filename \"%s\"" % (len(pattern), filename))
|
|
else:
|
|
msg("'" + pattern.decode('utf-8') + "'")
|
|
|
|
return
|
|
|
|
# cyclic_pattern()
|
|
def pattern_offset(self, *arg):
|
|
"""
|
|
Search for offset of a value in cyclic pattern
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME value
|
|
"""
|
|
|
|
(value,) = normalize_argv(arg, 1)
|
|
if value is None:
|
|
self._missing_argument()
|
|
|
|
pos = cyclic_pattern_offset(value)
|
|
if pos is None:
|
|
msg("%s not found in pattern buffer" % value)
|
|
else:
|
|
msg("%s found at offset: %d" % (value, pos))
|
|
|
|
return
|
|
|
|
# cyclic_pattern(), searchmem_*()
|
|
def pattern_search(self, *arg):
|
|
"""
|
|
Search a cyclic pattern in registers and memory
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME
|
|
"""
|
|
def nearby_offset(v):
|
|
for offset in range(-128, 128, 4):
|
|
pos = cyclic_pattern_offset(v + offset)
|
|
if pos is not None:
|
|
return (pos, offset)
|
|
return None
|
|
|
|
if not self._is_running():
|
|
return
|
|
|
|
reg_result = {}
|
|
regs = peda.getregs()
|
|
|
|
# search for registers with value in pattern buffer
|
|
for (r, v) in regs.items():
|
|
if len(to_hex(v)) < 8: continue
|
|
res = nearby_offset(v)
|
|
if res:
|
|
reg_result[r] = res
|
|
|
|
if reg_result:
|
|
msg("Registers contain pattern buffer:", "red")
|
|
for (r, (p, o)) in reg_result.items():
|
|
msg("%s+%d found at offset: %d" % (r.upper(), o, p))
|
|
else:
|
|
msg("No register contains pattern buffer")
|
|
|
|
# search for registers which point to pattern buffer
|
|
reg_result = {}
|
|
for (r, v) in regs.items():
|
|
if not peda.is_address(v): continue
|
|
chain = peda.examine_mem_reference(v)
|
|
(v, t, vn) = chain[-1]
|
|
if not vn: continue
|
|
o = cyclic_pattern_offset(vn.strip("'").strip('"')[:4])
|
|
if o is not None:
|
|
reg_result[r] = (len(chain), len(vn)-2, o)
|
|
|
|
if reg_result:
|
|
msg("Registers point to pattern buffer:", "yellow")
|
|
for (r, (d, l, o)) in reg_result.items():
|
|
msg("[%s] %s offset %d - size ~%d" % (r.upper(), "-->"*d, o, l))
|
|
else:
|
|
msg("No register points to pattern buffer")
|
|
|
|
# search for pattern buffer in memory
|
|
maps = peda.get_vmmap()
|
|
search_result = []
|
|
for (start, end, perm, name) in maps:
|
|
if "w" not in perm: continue # only search in writable memory
|
|
res = cyclic_pattern_search(peda.dumpmem(start, end))
|
|
for (a, l, o) in res:
|
|
a += start
|
|
search_result += [(a, l, o)]
|
|
|
|
sp = peda.getreg("sp")
|
|
if search_result:
|
|
msg("Pattern buffer found at:", "green")
|
|
for (a, l, o) in search_result:
|
|
ranges = peda.get_vmrange(a)
|
|
text = "%s : offset %4d - size %4d" % (to_address(a), o, l)
|
|
if ranges[3] == "[stack]":
|
|
text += " ($sp + %s [%d dwords])" % (to_hex(a-sp), (a-sp)//4)
|
|
else:
|
|
text += " (%s)" % ranges[3]
|
|
msg(text)
|
|
else:
|
|
msg("Pattern buffer not found in memory")
|
|
|
|
# search for references to pattern buffer in memory
|
|
ref_result = []
|
|
for (a, l, o) in search_result:
|
|
res = peda.searchmem_by_range("all", "0x%x" % a)
|
|
ref_result += [(x[0], a) for x in res]
|
|
if len(ref_result) > 0:
|
|
msg("References to pattern buffer found at:", "blue")
|
|
for (a, v) in ref_result:
|
|
ranges = peda.get_vmrange(a)
|
|
text = "%s : %s" % (to_address(a), to_address(v))
|
|
if ranges[3] == "[stack]":
|
|
text += " ($sp + %s [%d dwords])" % (to_hex(a-sp), (a-sp)//4)
|
|
else:
|
|
text += " (%s)" % ranges[3]
|
|
msg(text)
|
|
else:
|
|
msg("Reference to pattern buffer not found in memory")
|
|
|
|
return
|
|
|
|
# cyclic_pattern(), writemem()
|
|
def pattern_patch(self, *arg):
|
|
"""
|
|
Write a cyclic pattern to memory
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME address size
|
|
"""
|
|
|
|
(address, size) = normalize_argv(arg, 2)
|
|
if size is None:
|
|
self._missing_argument()
|
|
|
|
pattern = cyclic_pattern(size)
|
|
num_bytes_written = peda.writemem(address, pattern)
|
|
if num_bytes_written:
|
|
msg("Written %d chars of cyclic pattern to 0x%x" % (size, address))
|
|
else:
|
|
msg("Failed to write to memory")
|
|
|
|
return
|
|
|
|
# cyclic_pattern()
|
|
def pattern_arg(self, *arg):
|
|
"""
|
|
Set argument list with cyclic pattern
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME size1 [size2,offset2] ...
|
|
"""
|
|
|
|
if not arg:
|
|
self._missing_argument()
|
|
|
|
arglist = []
|
|
for a in arg:
|
|
(size, offset) = (a + ",").split(",")[:2]
|
|
if offset:
|
|
offset = to_int(offset)
|
|
else:
|
|
offset = 0
|
|
size = to_int(size)
|
|
if size is None or offset is None:
|
|
self._missing_argument()
|
|
|
|
# try to generate unique, non-overlapped patterns
|
|
if arglist and offset == 0:
|
|
offset = sum(arglist[-1])
|
|
arglist += [(size, offset)]
|
|
|
|
patterns = []
|
|
for (s, o) in arglist:
|
|
patterns += ["\'%s\'" % cyclic_pattern(s, o).decode('utf-8')]
|
|
peda.execute("set arg %s" % " ".join(patterns))
|
|
msg("Set %d arguments to program" % len(patterns))
|
|
|
|
return
|
|
|
|
# cyclic_pattern()
|
|
def pattern_env(self, *arg):
|
|
"""
|
|
Set environment variable with a cyclic pattern
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME ENVNAME size[,offset]
|
|
"""
|
|
|
|
(env, size) = normalize_argv(arg, 2)
|
|
if size is None:
|
|
self._missing_argument()
|
|
|
|
(size, offset) = (arg[1] + ",").split(",")[:2]
|
|
size = to_int(size)
|
|
if offset:
|
|
offset = to_int(offset)
|
|
else:
|
|
offset = 0
|
|
if size is None or offset is None:
|
|
self._missing_argument()
|
|
|
|
peda.execute("set env %s %s" % (env, cyclic_pattern(size, offset).decode('utf-8')))
|
|
msg("Set environment %s = cyclic_pattern(%d, %d)" % (env, size, offset))
|
|
|
|
return
|
|
|
|
def pattern(self, *arg):
|
|
"""
|
|
Generate, search, or write a cyclic pattern to memory
|
|
Set "pattern" option for basic/extended pattern type
|
|
Usage:
|
|
MYNAME create size [file]
|
|
MYNAME offset value
|
|
MYNAME search
|
|
MYNAME patch address size
|
|
MYNAME arg size1 [size2,offset2]
|
|
MYNAME env size[,offset]
|
|
"""
|
|
|
|
options = ["create", "offset", "search", "patch", "arg", "env"]
|
|
(opt,) = normalize_argv(arg, 1)
|
|
if opt is None or opt not in options:
|
|
self._missing_argument()
|
|
|
|
func = getattr(self, "pattern_%s" % opt)
|
|
func(*arg[1:])
|
|
|
|
return
|
|
pattern.options = ["create", "offset", "search", "patch", "arg", "env"]
|
|
|
|
def substr(self, *arg):
|
|
"""
|
|
Search for substrings of a given string/number in memory
|
|
Commonly used for ret2strcpy ROP exploit
|
|
Usage:
|
|
MYNAME "string" start end
|
|
MYNAME "string" [mapname] (default is search in current binary)
|
|
"""
|
|
(search, start, end) = normalize_argv(arg, 3)
|
|
if search is None:
|
|
self._missing_argument()
|
|
|
|
result = []
|
|
search = arg[0]
|
|
mapname = None
|
|
if start is None:
|
|
mapname = "binary"
|
|
elif end is None:
|
|
mapname = start
|
|
|
|
if mapname:
|
|
msg("Searching for sub strings of: %s in: %s ranges" % (repr(search), mapname))
|
|
maps = peda.get_vmmap(mapname)
|
|
for (start, end, perm, _) in maps:
|
|
if perm == "---p": # skip private range
|
|
continue
|
|
result = peda.search_substr(start, end, search)
|
|
if result: # return the first found result
|
|
break
|
|
else:
|
|
msg("Searching for sub strings of: %s in range: 0x%x - 0x%x" % (repr(search), start, end))
|
|
result = peda.search_substr(start, end, search)
|
|
|
|
if result:
|
|
msg("# (address, target_offset), # value (address=0xffffffff means not found)")
|
|
offset = 0
|
|
for (k, v) in result:
|
|
msg("(0x%x, %d), # %s" % ((0xffffffff if v == -1 else v), offset, string_repr(k)))
|
|
offset += len(k)
|
|
else:
|
|
msg("Not found")
|
|
|
|
return
|
|
|
|
def assemble(self, *arg):
|
|
"""
|
|
On the fly assemble and execute instructions using NASM
|
|
Usage:
|
|
MYNAME [mode] [address]
|
|
mode: -b16 / -b32 / -b64
|
|
"""
|
|
(mode, address) = normalize_argv(arg, 2)
|
|
|
|
exec_mode = 0
|
|
write_mode = 0
|
|
if to_int(mode) is not None:
|
|
address, mode = mode, None
|
|
|
|
(arch, bits) = peda.getarch()
|
|
if mode is None:
|
|
mode = bits
|
|
else:
|
|
mode = to_int(mode[2:])
|
|
if mode not in [16, 32, 64]:
|
|
self._missing_argument()
|
|
|
|
if self._is_running() and address == peda.getreg("pc"):
|
|
write_mode = exec_mode = 1
|
|
|
|
line = peda.execute_redirect("show write")
|
|
if line and "on" in line.split()[-1]:
|
|
write_mode = 1
|
|
|
|
if address is None or mode != bits:
|
|
write_mode = exec_mode = 0
|
|
|
|
if write_mode:
|
|
msg("Instruction will be written to 0x%x" % address)
|
|
else:
|
|
msg("Instructions will be written to stdout")
|
|
|
|
msg("Type instructions (NASM syntax), one or more per line separated by \";\"")
|
|
msg("End with a line saying just \"end\"")
|
|
|
|
if not write_mode:
|
|
address = 0xdeadbeef
|
|
|
|
inst_list = []
|
|
inst_code = b""
|
|
# fetch instruction loop
|
|
while True:
|
|
inst = input("iasm|0x%x> " % address)
|
|
if inst == "end":
|
|
break
|
|
if inst == "":
|
|
continue
|
|
bincode = peda.assemble(inst, mode)
|
|
size = len(bincode)
|
|
if size == 0:
|
|
continue
|
|
inst_list.append((size, bincode, inst))
|
|
if write_mode:
|
|
peda.writemem(address, bincode)
|
|
# execute assembled code
|
|
if exec_mode:
|
|
peda.execute("stepi %d" % (inst.count(";")+1))
|
|
|
|
address += size
|
|
inst_code += bincode
|
|
msg("hexify: \"%s\"" % to_hexstr(bincode))
|
|
|
|
text = Nasm.format_shellcode(b"".join([x[1] for x in inst_list]), mode)
|
|
if text:
|
|
msg("Assembled%s instructions:" % ("/Executed" if exec_mode else ""))
|
|
msg(text)
|
|
msg("hexify: \"%s\"" % to_hexstr(inst_code))
|
|
|
|
return
|
|
|
|
|
|
####################################
|
|
# Payload/Shellcode Generation #
|
|
####################################
|
|
def skeleton(self, *arg):
|
|
"""
|
|
Generate python exploit code template
|
|
Usage:
|
|
MYNAME type [file]
|
|
type = argv: local exploit via argument
|
|
type = env: local exploit via crafted environment (including NULL byte)
|
|
type = stdin: local exploit via stdin
|
|
type = remote: remote exploit via TCP socket
|
|
"""
|
|
options = ["argv", "stdin", "env", "remote"]
|
|
(opt, outfile) = normalize_argv(arg, 2)
|
|
if opt not in options:
|
|
self._missing_argument()
|
|
|
|
pattern = cyclic_pattern(20000).decode('utf-8')
|
|
if opt == "argv":
|
|
code = ExploitSkeleton().skeleton_local_argv
|
|
if opt == "env":
|
|
code = ExploitSkeleton().skeleton_local_env
|
|
if opt == "stdin":
|
|
code = ExploitSkeleton().skeleton_local_stdin
|
|
if opt == "remote":
|
|
code = ExploitSkeleton().skeleton_remote_tcp
|
|
|
|
if outfile:
|
|
msg("Writing skeleton code to file \"%s\"" % outfile)
|
|
open(outfile, "w").write(code.strip("\n"))
|
|
os.chmod(outfile, 0o755)
|
|
open("pattern.txt", "w").write(pattern)
|
|
else:
|
|
msg(code)
|
|
|
|
return
|
|
skeleton.options = ["argv", "stdin", "env", "remote"]
|
|
|
|
def shellcode(self, *arg):
|
|
"""
|
|
Generate or download common shellcodes.
|
|
Usage:
|
|
MYNAME generate [arch/]platform type [port] [host]
|
|
MYNAME search keyword (use % for any character wildcard)
|
|
MYNAME display shellcodeId (shellcodeId as appears in search results)
|
|
MYNAME zsc [generate customize shellcode]
|
|
|
|
For generate option:
|
|
default port for bindport shellcode: 16706 (0x4142)
|
|
default host/port for connect back shellcode: 127.127.127.127/16706
|
|
supported arch: x86
|
|
"""
|
|
def list_shellcode():
|
|
"""
|
|
List available shellcodes
|
|
"""
|
|
text = "Available shellcodes:\n"
|
|
for arch in SHELLCODES:
|
|
for platform in SHELLCODES[arch]:
|
|
for sctype in SHELLCODES[arch][platform]:
|
|
text += " %s/%s %s\n" % (arch, platform, sctype)
|
|
msg(text)
|
|
|
|
""" Multiple variable name for different modes """
|
|
(mode, platform, sctype, port, host) = normalize_argv(arg, 5)
|
|
(mode, keyword) = normalize_argv(arg, 2)
|
|
(mode, shellcodeId) = normalize_argv(arg, 2)
|
|
|
|
if mode == "generate":
|
|
arch = "x86"
|
|
if platform and "/" in platform:
|
|
(arch, platform) = platform.split("/")
|
|
|
|
if platform not in SHELLCODES[arch] or not sctype:
|
|
list_shellcode()
|
|
return
|
|
#dbg_print_vars(arch, platform, sctype, port, host)
|
|
try:
|
|
sc = Shellcode(arch, platform).shellcode(sctype, port, host)
|
|
except Exception as e:
|
|
self._missing_argument()
|
|
|
|
if not sc:
|
|
msg("Unknown shellcode")
|
|
return
|
|
|
|
hexstr = to_hexstr(sc)
|
|
linelen = 16 # display 16-bytes per line
|
|
i = 0
|
|
text = "# %s/%s/%s: %d bytes\n" % (arch, platform, sctype, len(sc))
|
|
if sctype in ["bindport", "connect"]:
|
|
text += "# port=%s, host=%s\n" % (port if port else '16706', host if host else '127.127.127.127')
|
|
text += "shellcode = (\n"
|
|
while hexstr:
|
|
text += ' "%s"\n' % (hexstr[:linelen*4])
|
|
hexstr = hexstr[linelen*4:]
|
|
i += 1
|
|
text += ")"
|
|
msg(text)
|
|
|
|
# search shellcodes on shell-storm.org
|
|
elif mode == "search":
|
|
if keyword is None:
|
|
self._missing_argument()
|
|
|
|
res_dl = Shellcode().search(keyword)
|
|
if not res_dl:
|
|
msg("Shellcode not found or cannot retrieve the result")
|
|
return
|
|
|
|
msg("Found %d shellcodes" % len(res_dl))
|
|
msg("%s\t%s" %(blue("ScId"), blue("Title")))
|
|
text = ""
|
|
for data_d in res_dl:
|
|
text += "[%s]\t%s - %s\n" %(yellow(data_d['ScId']), data_d['ScArch'], data_d['ScTitle'])
|
|
pager(text)
|
|
|
|
# download shellcodes from shell-storm.org
|
|
elif mode == "display":
|
|
if to_int(shellcodeId) is None:
|
|
self._missing_argument()
|
|
|
|
res = Shellcode().display(shellcodeId)
|
|
if not res:
|
|
msg("Shellcode id not found or cannot retrieve the result")
|
|
return
|
|
|
|
msg(res)
|
|
#OWASP ZSC API Z3r0D4y.Com
|
|
elif mode == "zsc":
|
|
'os lists'
|
|
oslist = ['linux_x86','linux_x64','linux_arm','linux_mips','freebsd_x86',
|
|
'freebsd_x64','windows_x86','windows_x64','osx','solaris_x64','solaris_x86']
|
|
'functions'
|
|
joblist = ['exec(\'/path/file\')','chmod(\'/path/file\',\'permission number\')','write(\'/path/file\',\'text to write\')',
|
|
'file_create(\'/path/file\',\'text to write\')','dir_create(\'/path/folder\')','download(\'url\',\'filename\')',
|
|
'download_execute(\'url\',\'filename\',\'command to execute\')','system(\'command to execute\')']
|
|
'encode types'
|
|
encodelist = ['none','xor_random','xor_yourvalue','add_random','add_yourvalue','sub_random',
|
|
'sub_yourvalue','inc','inc_timeyouwant','dec','dec_timeyouwant','mix_all']
|
|
try:
|
|
while True:
|
|
for os in oslist:
|
|
msg('%s %s'%(yellow('[+]'),green(os)))
|
|
if pyversion == 2:
|
|
os = input('%s'%blue('os:'))
|
|
if pyversion == 3:
|
|
os = input('%s'%blue('os:'))
|
|
if os in oslist: #check if os exist
|
|
break
|
|
else:
|
|
warning_msg("Wrong input! Try Again.")
|
|
while True:
|
|
for job in joblist:
|
|
msg('%s %s'%(yellow('[+]'),green(job)))
|
|
if pyversion == 2:
|
|
job = raw_input('%s'%blue('job:'))
|
|
if pyversion == 3:
|
|
job = input('%s'%blue('job:'))
|
|
if job != '':
|
|
break
|
|
else:
|
|
warning_msg("Please enter a function.")
|
|
while True:
|
|
for encode in encodelist:
|
|
msg('%s %s'%(yellow('[+]'),green(encode)))
|
|
if pyversion == 2:
|
|
encode = raw_input('%s'%blue('encode:'))
|
|
if pyversion == 3:
|
|
encode = input('%s'%blue('encode:'))
|
|
if encode != '':
|
|
break
|
|
else:
|
|
warning_msg("Please enter a encode type.")
|
|
except (KeyboardInterrupt, SystemExit):
|
|
warning_msg("Aborted by user")
|
|
result = Shellcode().zsc(os,job,encode)
|
|
if result is not None:
|
|
msg(result)
|
|
else:
|
|
pass
|
|
return
|
|
else:
|
|
self._missing_argument()
|
|
|
|
return
|
|
shellcode.options = ["generate", "search", "display","zsc"]
|
|
|
|
def gennop(self, *arg):
|
|
"""
|
|
Generate abitrary length NOP sled using given characters
|
|
Usage:
|
|
MYNAME size [chars]
|
|
"""
|
|
(size, chars) = normalize_argv(arg, 2)
|
|
if size is None:
|
|
self._missing_argument()
|
|
|
|
nops = Shellcode.gennop(size, chars)
|
|
msg(repr(nops))
|
|
|
|
return
|
|
|
|
def payload(self, *arg):
|
|
"""
|
|
Generate various type of ROP payload using ret2plt
|
|
Usage:
|
|
MYNAME copybytes (generate function template for ret2strcpy style payload)
|
|
MYNAME copybytes dest1 data1 dest2 data2 ...
|
|
"""
|
|
(option,) = normalize_argv(arg, 1)
|
|
if option is None:
|
|
self._missing_argument()
|
|
|
|
if option == "copybytes":
|
|
result = peda.payload_copybytes(template=1) # function template
|
|
arg = arg[1:]
|
|
while len(arg) > 0:
|
|
(target, data) = normalize_argv(arg, 2)
|
|
if data is None:
|
|
break
|
|
if to_int(data) is None:
|
|
if data[0] == "[" and data[-1] == "]":
|
|
data = eval(data)
|
|
data = list2hexstr(data, peda.intsize())
|
|
else:
|
|
data = "0x%x" % data
|
|
result += peda.payload_copybytes(target, data)
|
|
arg = arg[2:]
|
|
|
|
if not result:
|
|
msg("Failed to construct payload")
|
|
else:
|
|
text = ""
|
|
indent = to_int(config.Option.get("indent"))
|
|
for line in result.splitlines():
|
|
text += " "*indent + line + "\n"
|
|
msg(text)
|
|
filename = peda.get_config_filename("payload")
|
|
open(filename, "w").write(text)
|
|
|
|
return
|
|
payload.options = ["copybytes"]
|
|
|
|
def snapshot(self, *arg):
|
|
"""
|
|
Save/restore process's snapshot to/from file
|
|
Usage:
|
|
MYNAME save file
|
|
MYNAME restore file
|
|
Warning: this is not thread safe, do not use with multithread program
|
|
"""
|
|
options = ["save", "restore"]
|
|
(opt, filename) = normalize_argv(arg, 2)
|
|
if opt not in options:
|
|
self._missing_argument()
|
|
|
|
if not filename:
|
|
filename = peda.get_config_filename("snapshot")
|
|
|
|
if opt == "save":
|
|
if peda.save_snapshot(filename):
|
|
msg("Saved process's snapshot to filename '%s'" % filename)
|
|
else:
|
|
msg("Failed to save process's snapshot")
|
|
|
|
if opt == "restore":
|
|
if peda.restore_snapshot(filename):
|
|
msg("Restored process's snapshot from filename '%s'" % filename)
|
|
peda.execute("stop")
|
|
else:
|
|
msg("Failed to restore process's snapshot")
|
|
|
|
return
|
|
snapshot.options = ["save", "restore"]
|
|
|
|
def crashdump(self, *arg):
|
|
"""
|
|
Display crashdump info and save to file
|
|
Usage:
|
|
MYNAME [reason_text]
|
|
"""
|
|
(reason,) = normalize_argv(arg, 1)
|
|
if not reason:
|
|
reason = "Interactive dump"
|
|
|
|
logname = peda.get_config_filename("crashlog")
|
|
logfd = open(logname, "a")
|
|
config.Option.set("_teefd", logfd)
|
|
msg("[%s]" % "START OF CRASH DUMP".center(78, "-"))
|
|
msg("Timestamp: %s" % time.ctime())
|
|
msg("Reason: %s" % red(reason))
|
|
|
|
# exploitability
|
|
pc = peda.getreg("pc")
|
|
if not peda.is_address(pc):
|
|
exp = red("EXPLOITABLE")
|
|
else:
|
|
exp = "Unknown"
|
|
msg("Exploitability: %s" % exp)
|
|
|
|
# registers, code, stack
|
|
self.context_register()
|
|
self.context_code(16)
|
|
self.context_stack()
|
|
|
|
# backtrace
|
|
msg("[%s]" % "backtrace (innermost 10 frames)".center(78, "-"), "blue")
|
|
msg(peda.execute_redirect("backtrace 10"))
|
|
|
|
msg("[%s]\n" % "END OF CRASH DUMP".center(78, "-"))
|
|
config.Option.set("_teefd", "")
|
|
logfd.close()
|
|
|
|
return
|
|
|
|
def utils(self, *arg):
|
|
"""
|
|
Miscelaneous utilities from utils module
|
|
Usage:
|
|
MYNAME command arg
|
|
"""
|
|
(command, carg) = normalize_argv(arg, 2)
|
|
cmds = ["int2hexstr", "list2hexstr", "str2intlist"]
|
|
if not command or command not in cmds or not carg:
|
|
self._missing_argument()
|
|
|
|
func = globals()[command]
|
|
if command == "int2hexstr":
|
|
if to_int(carg) is None:
|
|
msg("Not a number")
|
|
return
|
|
result = func(to_int(carg))
|
|
result = to_hexstr(result)
|
|
|
|
if command == "list2hexstr":
|
|
if to_int(carg) is not None:
|
|
msg("Not a list")
|
|
return
|
|
result = func(eval("%s" % carg))
|
|
result = to_hexstr(result)
|
|
|
|
if command == "str2intlist":
|
|
res = func(carg)
|
|
result = "["
|
|
for v in res:
|
|
result += "%s, " % to_hex(v)
|
|
result = result.rstrip(", ") + "]"
|
|
|
|
msg(result)
|
|
return
|
|
utils.options = ["int2hexstr", "list2hexstr", "str2intlist"]
|
|
|
|
###########################################################################
|
|
class pedaGDBCommand(gdb.Command):
|
|
"""
|
|
Wrapper of gdb.Command for master "peda" command
|
|
"""
|
|
def __init__(self, cmdname="peda"):
|
|
self.cmdname = cmdname
|
|
self.__doc__ = pedacmd._get_helptext()
|
|
super(pedaGDBCommand, self).__init__(self.cmdname, gdb.COMMAND_DATA)
|
|
|
|
def invoke(self, arg_string, from_tty):
|
|
# do not repeat command
|
|
self.dont_repeat()
|
|
arg = peda.string_to_argv(arg_string)
|
|
if len(arg) < 1:
|
|
pedacmd.help()
|
|
else:
|
|
cmd = arg[0]
|
|
if cmd in pedacmd.commands:
|
|
func = getattr(pedacmd, cmd)
|
|
try:
|
|
# reset memoized cache
|
|
reset_cache(sys.modules['__main__'])
|
|
func(*arg[1:])
|
|
except Exception as e:
|
|
if config.Option.get("debug") == "on":
|
|
msg("Exception: %s" %e)
|
|
traceback.print_exc()
|
|
peda.restore_user_command("all")
|
|
pedacmd.help(cmd)
|
|
else:
|
|
msg("Undefined command: %s. Try \"peda help\"" % cmd)
|
|
return
|
|
|
|
def complete(self, text, word):
|
|
completion = []
|
|
if text != "":
|
|
cmd = text.split()[0]
|
|
if cmd in pedacmd.commands:
|
|
func = getattr(pedacmd, cmd)
|
|
for opt in func.options:
|
|
if word in opt:
|
|
completion += [opt]
|
|
else:
|
|
for cmd in pedacmd.commands:
|
|
if cmd.startswith(text.strip()):
|
|
completion += [cmd]
|
|
else:
|
|
for cmd in pedacmd.commands:
|
|
if word in cmd and cmd not in completion:
|
|
completion += [cmd]
|
|
return completion
|
|
|
|
|
|
###########################################################################
|
|
class Alias(gdb.Command):
|
|
"""
|
|
Generic alias, create short command names
|
|
This doc should be changed dynamically
|
|
"""
|
|
def __init__(self, alias, command, shorttext=1):
|
|
(cmd, opt) = (command + " ").split(" ", 1)
|
|
if cmd == "peda" or cmd == "pead":
|
|
cmd = opt.split(" ")[0]
|
|
if not shorttext:
|
|
self.__doc__ = pedacmd._get_helptext(cmd)
|
|
else:
|
|
self.__doc__ = green("Alias for '%s'" % command)
|
|
self._command = command
|
|
self._alias = alias
|
|
super(Alias, self).__init__(alias, gdb.COMMAND_NONE)
|
|
|
|
def invoke(self, args, from_tty):
|
|
self.dont_repeat()
|
|
gdb.execute("%s %s" %(self._command, args))
|
|
|
|
def complete(self, text, word):
|
|
completion = []
|
|
cmd = self._command.split("peda ")[1]
|
|
for opt in getattr(pedacmd, cmd).options: # list of command's options
|
|
if text in opt and opt not in completion:
|
|
completion += [opt]
|
|
if completion != []:
|
|
return completion
|
|
if cmd in ["set", "show"] and text.split()[0] in ["option"]:
|
|
opname = [x for x in config.OPTIONS.keys() if x.startswith(word.strip())]
|
|
if opname != []:
|
|
completion = opname
|
|
else:
|
|
completion = list(config.OPTIONS.keys())
|
|
return completion
|
|
|
|
|
|
###########################################################################
|
|
## INITIALIZATION ##
|
|
# global instances of PEDA() and PEDACmd()
|
|
peda = PEDA()
|
|
pedacmd = PEDACmd()
|
|
pedacmd.help.__func__.options = pedacmd.commands # XXX HACK
|
|
|
|
# register "peda" command in gdb
|
|
pedaGDBCommand()
|
|
Alias("pead", "peda") # just for auto correction
|
|
|
|
# create aliases for subcommands
|
|
for cmd in pedacmd.commands:
|
|
func = getattr(pedacmd, cmd)
|
|
func.__func__.__doc__ = func.__doc__.replace("MYNAME", cmd)
|
|
if cmd not in ["help", "show", "set"]:
|
|
Alias(cmd, "peda %s" % cmd, 0)
|
|
|
|
# handle SIGINT / Ctrl-C
|
|
def sigint_handler(signal, frame):
|
|
warning_msg("Got Ctrl+C / SIGINT!")
|
|
gdb.execute("set logging off")
|
|
peda.restore_user_command("all")
|
|
raise KeyboardInterrupt
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
# custom hooks
|
|
peda.define_user_command("hook-stop",
|
|
"peda context\n"
|
|
"session autosave"
|
|
)
|
|
|
|
# common used shell commands aliases
|
|
shellcmds = ["man", "ls", "ps", "grep", "cat", "more", "less", "pkill", "clear", "vi", "nano"]
|
|
for cmd in shellcmds:
|
|
Alias(cmd, "shell %s" % cmd)
|
|
|
|
# custom command aliases, add any alias you want
|
|
Alias("phelp", "peda help")
|
|
Alias("pset", "peda set")
|
|
Alias("pshow", "peda show")
|
|
Alias("pbreak", "peda pltbreak")
|
|
Alias("pattc", "peda pattern_create")
|
|
Alias("patto", "peda pattern_offset")
|
|
Alias("patta", "peda pattern_arg")
|
|
Alias("patte", "peda pattern_env")
|
|
Alias("patts", "peda pattern_search")
|
|
Alias("find", "peda searchmem") # override gdb find command
|
|
Alias("ftrace", "peda tracecall")
|
|
Alias("itrace", "peda traceinst")
|
|
Alias("jtrace", "peda traceinst j")
|
|
Alias("stack", "peda telescope $sp")
|
|
Alias("viewmem", "peda telescope")
|
|
Alias("reg", "peda xinfo register")
|
|
Alias("brva", "breakrva")
|
|
|
|
# misc gdb settings
|
|
peda.execute("set confirm off")
|
|
peda.execute("set verbose off")
|
|
peda.execute("set output-radix 0x10")
|
|
peda.execute("set prompt \001%s\002" % red("\002gdb-peda$ \001")) # custom prompt
|
|
peda.execute("set height 0") # disable paging
|
|
peda.execute("set history expansion on")
|
|
peda.execute("set history save on") # enable history saving
|
|
peda.execute("set disassembly-flavor intel")
|
|
peda.execute("set follow-fork-mode child")
|
|
peda.execute("set backtrace past-main on")
|
|
peda.execute("set step-mode on")
|
|
peda.execute("set print pretty on")
|
|
peda.execute("handle SIGALRM print nopass") # ignore SIGALRM
|
|
peda.execute("handle SIGSEGV stop print nopass") # catch SIGSEGV
|