small-package/UA2F/scripts/test.py

108 lines
2.7 KiB
Python
Raw Normal View History

2024-07-01 04:18:20 +08:00
import atexit
import http.server
2024-11-26 20:41:54 +08:00
import json
import logging
2024-07-01 04:18:20 +08:00
import os
2024-11-27 12:25:35 +08:00
import socket
2024-07-01 04:18:20 +08:00
import socketserver
import subprocess
import sys
import threading
import time
import requests
2024-11-26 16:27:33 +08:00
from fake_useragent import UserAgent
2024-11-26 20:41:54 +08:00
from tqdm import tqdm
2024-11-26 16:27:33 +08:00
ua = UserAgent()
2024-07-01 04:18:20 +08:00
PORT = 37491
class MyHandler(http.server.SimpleHTTPRequestHandler):
2024-11-26 20:41:54 +08:00
def log_message(self, format, *args):
pass
2024-07-01 04:18:20 +08:00
def do_GET(self):
user_agent = self.headers.get('User-Agent')
# assert user_agent only contains F
if not all([c == 'F' for c in user_agent]):
2024-11-26 16:27:33 +08:00
self.send_response(400)
2024-11-26 20:41:54 +08:00
logging.error(f"Invalid User-Agent: {user_agent}")
else:
self.send_response(200)
2024-07-01 04:18:20 +08:00
self.end_headers()
2024-11-26 16:27:33 +08:00
ua_len = len(user_agent)
self.wfile.write(str(ua_len).encode())
2024-07-01 04:18:20 +08:00
def start_server():
2024-11-27 12:25:35 +08:00
with socketserver.TCPServer(('', PORT), MyHandler, bind_and_activate=False) as httpd:
httpd.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
httpd.server_bind()
httpd.server_activate()
print(f"Serving on port {PORT}")
2024-07-01 04:18:20 +08:00
httpd.serve_forever()
atexit.register(httpd.shutdown)
def start_ua2f(u: str):
p = subprocess.Popen([u])
atexit.register(lambda: p.kill())
def setup_iptables():
os.system(f"sudo iptables -A OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-11-27 12:25:35 +08:00
os.system(f"sudo ip6tables -A OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-07-01 04:18:20 +08:00
def cleanup_iptables():
os.system(f"sudo iptables -D OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-11-27 12:25:35 +08:00
os.system(f"sudo ip6tables -D OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-07-01 04:18:20 +08:00
if __name__ == "__main__":
if os.name != 'posix':
raise Exception("This script only supports Linux")
if os.geteuid() != 0:
raise Exception("This script requires root privileges")
ua2f = sys.argv[1]
setup_iptables()
server_thread = threading.Thread(target=start_server)
server_thread.daemon = True
server_thread.start()
print(f"Starting server on port {PORT}")
ua2f_thread = threading.Thread(target=start_ua2f, args=(ua2f,))
ua2f_thread.daemon = True
ua2f_thread.start()
print(f"Starting UA2F: {ua2f}")
2024-11-26 20:41:54 +08:00
time.sleep(3)
2024-07-01 04:18:20 +08:00
2024-11-27 10:57:37 +08:00
for i in tqdm(range(2000)):
2024-11-26 16:27:33 +08:00
nxt = ua.random
2024-11-27 10:57:37 +08:00
response = requests.get(f"http://127.0.0.1:{PORT}", headers={
2024-11-26 16:27:33 +08:00
"User-Agent": nxt
})
assert response.ok
assert response.text == str(len(nxt))
2024-07-01 04:18:20 +08:00
2024-11-27 12:25:35 +08:00
for i in tqdm(range(2000)):
nxt = ua.random
response = requests.get(f"http://[::1]:{PORT}", headers={
"User-Agent": nxt
})
assert response.ok
assert response.text == str(len(nxt))
2024-11-27 10:57:37 +08:00
2024-07-01 04:18:20 +08:00
# clean
cleanup_iptables()