-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathparallel_decomp.py
More file actions
98 lines (85 loc) · 4.21 KB
/
parallel_decomp.py
File metadata and controls
98 lines (85 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os.path
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import time
import pandas as pd
import glob
import subprocess
import argparse
import json
import sys
thread_num = 10
executor = ThreadPoolExecutor(max_workers=thread_num)
ghidra_projects = [f'parser_{i}/' for i in range(10)]
def process_unstripped_binary(ghidra_path, project_path, project_name, binary_path):
print(f"[*] hold {project_name} for {binary_path}")
cmd = f"{ghidra_path} {project_path} {project_name} -import {binary_path} -readOnly -postScript ./decompilation/decomp_for_unstripped.py"
try:
subprocess.run(cmd, shell=True, timeout=900*4)
except subprocess.TimeoutExpired:
print(f"[!] timeout for {binary_path}")
ghidra_projects.append(project_name)
print(f"[+] release {project_name} after finishing {binary_path}")
def process_stripped_binary(ghidra_path, project_path, project_name, binary_path):
print(f"[*] hold {project_name} for {binary_path}")
cmd = f"{ghidra_path} {project_path} {project_name} -import {binary_path} -readOnly -postScript ./decompilation/decomp_for_stripped.py"
try:
subprocess.run(cmd, shell=True, timeout=900*4)
except subprocess.TimeoutExpired:
print(f"[!] timeout for {binary_path}")
ghidra_projects.append(project_name)
print(f"[+] release {project_name} after finishing {binary_path}")
def main(args):
binary_path = args.binary_path
if os.path.isfile(binary_path):
print(f"[+] start to process {binary_path}")
while len(ghidra_projects) == 0:
print("Wait for ghidra project: 1 sec")
time.sleep(1)
ghidra_project = ghidra_projects.pop()
executor.submit(process_unstripped_binary if args.unstripped else process_stripped_binary,
ghidra_path=args.ghidra_path,
project_path=args.project_path,
project_name=ghidra_project,
binary_path=binary_path,)
elif os.path.isdir(binary_path):
for root, dirs, files in os.walk(binary_path):
for file in files:
binary_file_path = os.path.join(root, file)
print(f"[+] start to process {binary_file_path}")
while len(ghidra_projects) == 0:
print("Wait for ghidra project: 1 sec")
time.sleep(1)
ghidra_project = ghidra_projects.pop()
executor.submit(process_unstripped_binary if args.unstripped else process_stripped_binary,
ghidra_path=args.ghidra_path,
project_path=args.project_path,
project_name=ghidra_project,
binary_path=binary_path,)
while executor._work_queue.qsize() > thread_num:
print("Wait for executor: 1 sec", executor._work_queue.qsize())
time.sleep(1)
else:
print(f"Check your {binary_path}.")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Perform parallel decompilation and disassembling for binaries')
parser.add_argument('-u', '--unstripped', action='store_true',
help="Indicates that the binary is unstripped (contains debug symbols).")
parser.add_argument('-s', '--stripped', action='store_true',
help="Indicates that the binary is stripped (lacks debug symbols).")
parser.add_argument('-b', '--binary_path', type=str, required=True,
# default='',
help="Specify the path to the binary file or folder containing binaries.")
parser.add_argument('-g', '--ghidra_path', type=str, required=True,
# default='',
help="Provide the path to the Ghidra 'analyzeHeadless'.")
parser.add_argument('-p', '--project_path', type=str, required=True,
# default='',
help="Specify the directory path to Ghidra projects.")
args = parser.parse_args()
if args.unstripped == True and args.stripped == True or args.unstripped == False and args.stripped == False:
print("Error! You can just choose one mode '-u' or '-s'")
sys.exit(0)
if not os.path.exists(args.project_path):
os.makedirs(args.project_path)
main(args)