#!/usr/bin/env python3
import argparse
import ipaddress
import os
import random
import re
import socket
import time
from functools import lru_cache
from urllib.parse import quote, unquote, urljoin, urlparse, urlsplit, urlunsplit
import requests
from flask import Flask, Response, abort, make_response, request
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# -------- Config --------
ALLOW_PRIVATE = bool(int(os.getenv("ALLOW_PRIVATE", "0")))
DEFAULT_TIMEOUT = int(os.getenv("DEFAULT_TIMEOUT", "15"))
MAX_BYTES = int(os.getenv("MAX_BYTES", str(10 * 1024 * 1024)))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
MAX_REDIRECTS = int(os.getenv("MAX_REDIRECTS", "5"))
RETRYABLE_CODES = {429, 500, 502, 503, 504}
UA_POOL = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
]
SAFE_ECHO = (
"Content-Type",
"Cache-Control",
"Expires",
"Last-Modified",
"ETag",
"Content-Disposition",
)
# -------- App & session --------
app = Flask(__name__)
_session = requests.Session()
_session.mount(
"http://",
HTTPAdapter(
max_retries=Retry(
total=MAX_RETRIES,
status_forcelist=RETRYABLE_CODES,
allowed_methods=frozenset(["GET"]),
backoff_factor=1.0,
respect_retry_after_header=True,
raise_on_status=False,
),
pool_maxsize=50,
),
)
_session.mount(
"https://",
HTTPAdapter(
max_retries=Retry(
total=MAX_RETRIES,
status_forcelist=RETRYABLE_CODES,
allowed_methods=frozenset(["GET"]),
backoff_factor=1.0,
respect_retry_after_header=True,
raise_on_status=False,
),
pool_maxsize=50,
),
)
# -------- Small helpers (compact but explicit) --------
def _raw_site():
# Preserve '+' by reading raw query string; take everything after first 'site='
qs = request.query_string.decode("latin-1", "ignore")
i = qs.find("site=")
return None if i == -1 else unquote(qs[i + 5 :])
def _normalize(u: str) -> str:
p = urlsplit(u)
if not p.scheme or not p.netloc:
return u
return urlunsplit(
(
p.scheme,
p.netloc,
quote(p.path or "/", safe="/%:@"),
quote(p.query or "", safe="=&%+,:;@/?"),
"",
)
)
@lru_cache(maxsize=512)
def _resolves(host: str):
try:
return {ai[4][0] for ai in socket.getaddrinfo(host, None)}
except socket.gaierror:
return set()
def _assert_public(u: str):
try:
p = urlparse(u)
except Exception:
abort(400, "Malformed URL")
if p.scheme not in ("http", "https"):
abort(400, "URL must start with http:// or https://")
if not p.hostname:
abort(400, "URL must include a hostname")
if ALLOW_PRIVATE:
return
addrs = _resolves(p.hostname)
if not addrs:
abort(400, "Hostname cannot be resolved")
for ip_str in addrs:
ip = ipaddress.ip_address(ip_str)
if (
ip.is_private
or ip.is_loopback
or ip.is_reserved
or ip.is_link_local
or ip.is_multicast
):
abort(400, "Host resolves to a non-public address (blocked)")
def _headers():
return {
"User-Agent": random.choice(UA_POOL),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "identity",
"Connection": "close",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
def _fetch_final(url: str):
# Follow redirects manually; validate each hop before requesting it
hops, cur = 0, url
while True:
_assert_public(cur)
try:
r = _session.get(
cur,
headers=_headers(),
timeout=DEFAULT_TIMEOUT,
allow_redirects=False,
stream=True,
)
except requests.RequestException as e:
abort(502, f"Upstream connection error: {e}")
if r.is_redirect or r.is_permanent_redirect:
if hops >= MAX_REDIRECTS:
r.close()
abort(502, "Too many redirects")
loc = r.headers.get("Location")
r.close()
if not loc:
abort(502, "Redirect without Location")
cur = urljoin(cur, loc)
hops += 1
time.sleep(0.05)
continue
return r
# add this helper (compact, robust enough without full HTML parsing)
def _inject_base(html_bytes: bytes, base_url: str) -> bytes:
base_tag = b'