files
2025-08-26 12:08:49 +03:00

266 lines
7.9 KiB
Python

#!/usr/bin/env python3
import argparse
import ipaddress
import os
import random
import re
import socket
import time
from functools import lru_cache
from urllib.parse import quote, unquote, urljoin, urlparse, urlsplit, urlunsplit
import requests
from flask import Flask, Response, abort, make_response, request
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# -------- Config --------
ALLOW_PRIVATE = bool(int(os.getenv("ALLOW_PRIVATE", "0")))
DEFAULT_TIMEOUT = int(os.getenv("DEFAULT_TIMEOUT", "15"))
MAX_BYTES = int(os.getenv("MAX_BYTES", str(10 * 1024 * 1024)))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
MAX_REDIRECTS = int(os.getenv("MAX_REDIRECTS", "5"))
RETRYABLE_CODES = {429, 500, 502, 503, 504}
UA_POOL = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
]
SAFE_ECHO = (
"Content-Type",
"Cache-Control",
"Expires",
"Last-Modified",
"ETag",
"Content-Disposition",
)
# -------- App & session --------
app = Flask(__name__)
_session = requests.Session()
_session.mount(
"http://",
HTTPAdapter(
max_retries=Retry(
total=MAX_RETRIES,
status_forcelist=RETRYABLE_CODES,
allowed_methods=frozenset(["GET"]),
backoff_factor=1.0,
respect_retry_after_header=True,
raise_on_status=False,
),
pool_maxsize=50,
),
)
_session.mount(
"https://",
HTTPAdapter(
max_retries=Retry(
total=MAX_RETRIES,
status_forcelist=RETRYABLE_CODES,
allowed_methods=frozenset(["GET"]),
backoff_factor=1.0,
respect_retry_after_header=True,
raise_on_status=False,
),
pool_maxsize=50,
),
)
# -------- Small helpers (compact but explicit) --------
def _raw_site():
# Preserve '+' by reading raw query string; take everything after first 'site='
qs = request.query_string.decode("latin-1", "ignore")
i = qs.find("site=")
return None if i == -1 else unquote(qs[i + 5 :])
def _normalize(u: str) -> str:
p = urlsplit(u)
if not p.scheme or not p.netloc:
return u
return urlunsplit(
(
p.scheme,
p.netloc,
quote(p.path or "/", safe="/%:@"),
quote(p.query or "", safe="=&%+,:;@/?"),
"",
)
)
@lru_cache(maxsize=512)
def _resolves(host: str):
try:
return {ai[4][0] for ai in socket.getaddrinfo(host, None)}
except socket.gaierror:
return set()
def _assert_public(u: str):
try:
p = urlparse(u)
except Exception:
abort(400, "Malformed URL")
if p.scheme not in ("http", "https"):
abort(400, "URL must start with http:// or https://")
if not p.hostname:
abort(400, "URL must include a hostname")
if ALLOW_PRIVATE:
return
addrs = _resolves(p.hostname)
if not addrs:
abort(400, "Hostname cannot be resolved")
for ip_str in addrs:
ip = ipaddress.ip_address(ip_str)
if (
ip.is_private
or ip.is_loopback
or ip.is_reserved
or ip.is_link_local
or ip.is_multicast
):
abort(400, "Host resolves to a non-public address (blocked)")
def _headers():
return {
"User-Agent": random.choice(UA_POOL),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "identity",
"Connection": "close",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
def _fetch_final(url: str):
# Follow redirects manually; validate each hop before requesting it
hops, cur = 0, url
while True:
_assert_public(cur)
try:
r = _session.get(
cur,
headers=_headers(),
timeout=DEFAULT_TIMEOUT,
allow_redirects=False,
stream=True,
)
except requests.RequestException as e:
abort(502, f"Upstream connection error: {e}")
if r.is_redirect or r.is_permanent_redirect:
if hops >= MAX_REDIRECTS:
r.close()
abort(502, "Too many redirects")
loc = r.headers.get("Location")
r.close()
if not loc:
abort(502, "Redirect without Location")
cur = urljoin(cur, loc)
hops += 1
time.sleep(0.05)
continue
return r
# add this helper (compact, robust enough without full HTML parsing)
def _inject_base(html_bytes: bytes, base_url: str) -> bytes:
base_tag = b'<base href="' + base_url.encode("utf-8", "ignore") + b'">'
# Find <head> ... </head>
m_head_open = re.search(br"<head[^>]*>", html_bytes, flags=re.I)
if m_head_open:
head_end = re.search(br"</head\s*>", html_bytes, flags=re.I)
end_idx = head_end.start() if head_end else m_head_open.end()
# Check if <base> already inside <head>
if re.search(br"<base\b", html_bytes[m_head_open.end():end_idx], flags=re.I):
return html_bytes
return html_bytes[:m_head_open.end()] + base_tag + html_bytes[m_head_open.end():]
# No <head> — try after <html>
m_html_open = re.search(br"<html[^>]*>", html_bytes, flags=re.I)
if m_html_open:
return (html_bytes[:m_html_open.end()]
+ b"<head>" + base_tag + b"</head>"
+ html_bytes[m_html_open.end():])
# No <html> either — prepend (rare but safe)
return b"<head>" + base_tag + b"</head>" + html_bytes
# -------- Route --------
@app.route("/", methods=["GET"])
def root():
site = _raw_site()
if not site:
abort(400, "Missing required query parameter: site")
site = _normalize(site)
_assert_public(site)
upstream = _fetch_final(site)
# Early size guard via Content-Length
cl = upstream.headers.get("Content-Length")
if cl and cl.isdigit() and int(cl) > MAX_BYTES:
upstream.close()
abort(502, "Upstream response too large")
# Stream & cap
total, buf = 0, []
try:
for chunk in upstream.iter_content(64 * 1024):
if not chunk:
continue
total += len(chunk)
if total > MAX_BYTES:
upstream.close()
abort(502, "Upstream response too large")
buf.append(chunk)
finally:
upstream.close()
body = b"".join(buf)
ctype = upstream.headers.get("Content-Type", "")
if "text/html" in ctype.lower():
body = _inject_base(body, upstream.url)
out = make_response(body, upstream.status_code)
out.headers["X-Proxied-From"] = site
for h in SAFE_ECHO:
if h in upstream.headers:
out.headers[h] = upstream.headers[h]
out.headers.setdefault("Content-Type", "application/octet-stream")
out.headers["Content-Length"] = str(len(body))
out.headers["X-Content-Type-Options"] = "nosniff"
return out
# Plain-text 4xx/5xx
@app.errorhandler(400)
@app.errorhandler(502)
def _err(e):
return Response(
f"{e.code} {e.name}: {getattr(e, 'description', str(e))}\n",
status=e.code,
mimetype="text/plain; charset=utf-8",
)
# -------- CLI --------
def main():
ap = argparse.ArgumentParser(description="Simple HTTPS proxy via Flask (compact)")
ap.add_argument("--port", type=int, default=8888)
ap.add_argument("--host", default="127.0.0.1")
a = ap.parse_args()
print(
f"MiniProxy (Flask) http://{a.host}:{a.port} (ALLOW_PRIVATE={'1' if ALLOW_PRIVATE else '0'})"
)
app.run(host=a.host, port=a.port, threaded=True)
if __name__ == "__main__":
main()