small-package/UA2F/scripts/test.py

104 lines
2.6 KiB
Python
Raw Normal View History

2024-07-01 04:18:20 +08:00
import atexit
import http.server
2024-11-26 20:41:54 +08:00
import json
import logging
2024-07-01 04:18:20 +08:00
import os
2024-11-27 12:25:35 +08:00
import socket
2024-07-01 04:18:20 +08:00
import socketserver
import subprocess
import sys
import threading
import time
2024-11-27 14:16:39 +08:00
from http.server import HTTPServer, BaseHTTPRequestHandler
2024-07-01 04:18:20 +08:00
import requests
2024-11-26 16:27:33 +08:00
from fake_useragent import UserAgent
2024-11-26 20:41:54 +08:00
from tqdm import tqdm
2024-11-27 14:16:39 +08:00
from fastapi import FastAPI, Request
from fastapi.responses import Response
from fastapi.staticfiles import StaticFiles
from uvicorn import Config, Server
2024-11-26 16:27:33 +08:00
ua = UserAgent()
2024-07-01 04:18:20 +08:00
PORT = 37491
2024-11-27 14:16:39 +08:00
app = FastAPI()
2024-11-26 20:41:54 +08:00
2024-11-27 14:16:39 +08:00
@app.get("/")
async def root(request: Request):
user_agent = request.headers.get("user-agent")
2024-07-01 04:18:20 +08:00
2024-11-27 14:16:39 +08:00
if not all(c == 'F' for c in user_agent):
return Response(status_code=400)
2024-07-01 04:18:20 +08:00
2024-11-27 14:16:39 +08:00
return Response(content=str(len(user_agent)).encode())
2024-07-01 04:18:20 +08:00
def start_server():
2024-11-27 14:16:39 +08:00
config4 = Config(app=app, host="127.0.0.1", port=PORT, access_log=False)
config6 = Config(app=app, host="::1", port=PORT, access_log=False)
server4 = Server(config4)
server6 = Server(config6)
t4 = threading.Thread(target=server4.run)
t4.daemon = True
t6 = threading.Thread(target=server6.run)
t6.daemon = True
t4.start()
t6.start()
2024-07-01 04:18:20 +08:00
def start_ua2f(u: str):
p = subprocess.Popen([u])
atexit.register(lambda: p.kill())
def setup_iptables():
os.system(f"sudo iptables -A OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-11-27 12:25:35 +08:00
os.system(f"sudo ip6tables -A OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-07-01 04:18:20 +08:00
def cleanup_iptables():
os.system(f"sudo iptables -D OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-11-27 12:25:35 +08:00
os.system(f"sudo ip6tables -D OUTPUT -p tcp --dport {PORT} -j NFQUEUE --queue-num 10010")
2024-07-01 04:18:20 +08:00
if __name__ == "__main__":
if os.name != 'posix':
raise Exception("This script only supports Linux")
if os.geteuid() != 0:
raise Exception("This script requires root privileges")
ua2f = sys.argv[1]
setup_iptables()
2024-11-27 14:16:39 +08:00
start_server()
2024-07-01 04:18:20 +08:00
ua2f_thread = threading.Thread(target=start_ua2f, args=(ua2f,))
ua2f_thread.daemon = True
ua2f_thread.start()
print(f"Starting UA2F: {ua2f}")
2024-11-26 20:41:54 +08:00
time.sleep(3)
2024-07-01 04:18:20 +08:00
2024-11-27 14:16:39 +08:00
for i in tqdm(range(1024)):
2024-11-26 16:27:33 +08:00
nxt = ua.random
2024-11-27 10:57:37 +08:00
response = requests.get(f"http://127.0.0.1:{PORT}", headers={
2024-11-26 16:27:33 +08:00
"User-Agent": nxt
})
assert response.ok
assert response.text == str(len(nxt))
2024-07-01 04:18:20 +08:00
2024-11-27 14:16:39 +08:00
for i in tqdm(range(4096)):
2024-11-27 12:25:35 +08:00
nxt = ua.random
response = requests.get(f"http://[::1]:{PORT}", headers={
"User-Agent": nxt
})
assert response.ok
assert response.text == str(len(nxt))
2024-11-27 10:57:37 +08:00
2024-07-01 04:18:20 +08:00
# clean
cleanup_iptables()