delete python loadbalancer

This commit is contained in:
Cyberes 2024-04-12 18:36:11 -06:00
parent 89c0e16379
commit 74d18e781a
12 changed files with 0 additions and 651 deletions

View File

@ -1,45 +0,0 @@
# Skeleton App
This directory contains a sample standalone application structure which uses `proxy.py`
via `requirements.txt` file.
## Setup
```console
$ git clone https://github.com/abhinavsingh/proxy.py.git
$ cd proxy.py/skeleton
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt
```
## Run It
`python -m app`
Start your app and make a web request to `/` and a proxy request via the instance. You will
see log lines like this:
```console
$ python -m app
...[redacted]... - Loaded plugin proxy.http.proxy.HttpProxyPlugin
...[redacted]... - Loaded plugin proxy.http.server.HttpWebServerPlugin
...[redacted]... - Loaded plugin app.plugins.MyWebServerPlugin
...[redacted]... - Loaded plugin app.plugins.MyProxyPlugin
...[redacted]... - Listening on 127.0.0.1:9000
...[redacted]... - Started 16 acceptors in threadless (local) mode
...[redacted]... - HttpProtocolException: HttpRequestRejected b"I'm a tea pot"
...[redacted]... - 127.0.0.1:64601 - GET None:None/get - None None - 0 bytes - 0.64ms
...[redacted]... - 127.0.0.1:64622 - GET / - curl/7.77.0 - 0.95ms
```
Voila!!!
That is your custom app skeleton structure built on top of `proxy.py`. Now copy the `app` directory
outside of `proxy.py` repo and create your own git repo. Customize the `app` for your project needs
## Run in detached (background) mode
1. For one-off use cases, you can directly use the following command to start the app in background:
`python -m app 2>&1 &`
2. For production usage, you likely want a process control manager e.g. supervisord, systemd etc

View File

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .app import entry_point
__all__ = [
'entry_point',
]

View File

@ -1,15 +0,0 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from app import entry_point
if __name__ == '__main__':
entry_point()

View File

@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import ipaddress
import logging
import threading
import time
import coloredlogs
from proxy import proxy
from redis import Redis
from .background import validate_proxies
coloredlogs.install(level='INFO')
def entry_point() -> None:
logger = logging.getLogger(__name__)
redis = Redis(host='localhost', port=6379, decode_responses=True)
redis.flushall()
redis.set('balancer_online', 0)
redis.set('suicide_online', 0)
threading.Thread(target=validate_proxies, daemon=True).start()
time.sleep(5)
while not int(redis.get('balancer_online')):
logger.warning('Waiting for background thread to populate proxies...')
time.sleep(5)
# suicide.SUICIDE_PACT.pact = threading.Thread(target=suicide.check_url_thread, args=("http://127.0.0.1:9000",), daemon=True)
# suicide.SUICIDE_PACT.pact.start()
with proxy.Proxy(
enable_web_server=True,
port=9000,
timeout=300,
hostname=ipaddress.IPv4Address('0.0.0.0'),
# NOTE: Pass plugins via *args if you define custom flags.
# Currently plugins passed via **kwargs are not discovered for
# custom flags by proxy.py
# See https://github.com/abhinavsingh/proxy.py/issues/871
plugins=[
'app.plugins.ProxyLoadBalancer',
],
disable_headers=[
b'smartproxy-bypass',
b'smartproxy-disable-bv3hi'
]
) as _:
proxy.sleep_loop()

View File

@ -1,127 +0,0 @@
import concurrent
import logging
import os
import random
import signal
import time
import requests
from redis import Redis
from .config import PROXY_POOL, SMARTPROXY_POOL, IP_CHECKER, MAX_PROXY_CHECKERS, SMARTPROXY_BV3HI_FIX
from .pid import zombie_slayer
from .redis_cycle import add_backend_cycler
from .smartproxy import transform_smartproxy
DEBUG_MODE = False
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'cross-site',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
def validate_proxies():
"""
Validate proxies by sending a request to https://api.ipify.org and checking the resulting IP address.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
redis = Redis(host='localhost', port=6379, decode_responses=True)
logger.info('Doing inital backend check, please wait...')
started = False
while True:
# Health checks. If one of these fails, the process is killed to be restarted by systemd.
if int(redis.get('balancer_online')):
zombie_slayer()
try:
response = requests.get('http://localhost:9000', headers={'User-Agent': 'HEALTHCHECK'}, timeout=10)
if response.status_code != 404:
logger.critical(f"Frontend HTTP check failed with status code: {response.status_code}")
os.kill(os.getpid(), signal.SIGKILL)
except requests.exceptions.RequestException as e:
logger.critical(f"Frontend HTTP check failed with exception: {e}")
os.kill(os.getpid(), signal.SIGKILL)
our_online_backends = {}
smartproxy_online_backends = {}
smartproxy_broken_proxies = {}
ip_addresses = set()
def check_proxy(pxy):
try:
smartproxy = False
if pxy in SMARTPROXY_POOL:
smartproxy = True
r = requests.get(IP_CHECKER, proxies={'http': transform_smartproxy(pxy), 'https': transform_smartproxy(pxy)}, timeout=15, headers=headers)
else:
r = requests.get(IP_CHECKER, proxies={'http': pxy, 'https': pxy}, timeout=15, headers=headers)
if r.status_code != 200:
logger.info(f'PROXY TEST failed - {pxy} - got code {r.status_code}')
return
except Exception as e:
logger.info(f'PROXY TEST failed - {pxy} - {e}') # ': {e.__class__.__name__}')
return
ip = r.text
if ip not in ip_addresses:
proxy_dict = our_online_backends if not smartproxy else smartproxy_online_backends
ip_addresses.add(ip)
proxy_dict[pxy] = ip
else:
s = ' Smartproxy ' if smartproxy else ' '
logger.warning(f'Duplicate{s}IP: {ip}')
return
# TODO: remove when fixed
try:
if smartproxy:
for d in SMARTPROXY_BV3HI_FIX:
r2 = requests.get(d, proxies={'http': transform_smartproxy(pxy), 'https': transform_smartproxy(pxy)}, timeout=15, headers=headers)
if r2.status_code != 200:
smartproxy_broken_proxies[pxy] = r.text
logger.info(f'PROXY BV3HI TEST failed - {pxy} - got code {r2.status_code}')
except Exception as e:
smartproxy_broken_proxies[pxy] = r.text
logger.info(f'PROXY BV3HI TEST failed - {pxy} - {e}')
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PROXY_CHECKERS) as executor:
executor.map(check_proxy, set(PROXY_POOL) | set(SMARTPROXY_POOL))
our_valid_proxies = list(our_online_backends.keys())
# Remove the broken SmartProxy proxies from the working ones.
sp_all = list(smartproxy_online_backends.keys())
smartproxy_broken_proxies = list(smartproxy_broken_proxies.keys())
smartproxy_valid_proxies = list(set(sp_all) - set(smartproxy_broken_proxies))
all_valid_proxies = list(set(our_valid_proxies) | set(smartproxy_valid_proxies))
all_valid_proxies_with_broken_smartproxy = list(set(all_valid_proxies) | set(sp_all))
if not started:
random.shuffle(all_valid_proxies)
random.shuffle(our_valid_proxies)
started = True
add_backend_cycler('all_valid_proxies', all_valid_proxies)
add_backend_cycler('our_valid_proxies', our_valid_proxies)
add_backend_cycler('all_valid_proxies_with_broken_smartproxy', all_valid_proxies_with_broken_smartproxy)
if DEBUG_MODE:
logger.info(f'Our Backends Online ({len(our_valid_proxies)}): {all_valid_proxies}')
logger.info(f'Smartproxy Backends Online ({len(smartproxy_valid_proxies)}): {smartproxy_valid_proxies}')
logger.info(f'Smartproxy Broken Backends ({len(smartproxy_broken_proxies)}): {smartproxy_broken_proxies}')
else:
logger.info(f'Our Backends Online: {len(our_valid_proxies)}, Smartproxy Backends Online: {len(smartproxy_valid_proxies)}, Smartproxy Broken Backends: {len(smartproxy_broken_proxies)}, Total Online: {len(our_valid_proxies) + len(smartproxy_valid_proxies)}')
redis.set('balancer_online', 1)
time.sleep(60)

View File

@ -1,33 +0,0 @@
# The address of the service used by the IP address of each proxy.
IP_CHECKER = 'http://[your IP or domain name]:5000'
# How many threads to use when checking proxy health.
MAX_PROXY_CHECKERS = 50
PROXY_POOL = [
"127.0.0.1:3128",
"[other server]:3128",
]
# Your login for SmartProxy. Ignore if you aren't using the service.
SMARTPROXY_USER = 'example'
SMARTPROXY_PASS = 'password'
# Fix the 503 error on some SmartProxy hosts.
# TODO: remove when fixed.
SMARTPROXY_BV3HI_FIX = [
'https://files.catbox.moe/1hvrlj.png'
]
# Some domains just don't work through SmartProxy. Domains in this list are routed though
# your proxies, not SmartProxy.
BYPASS_SMARTPROXY_DOMAINS = [
'twitter.com'
]
# The domains that SmartProxy gives you to connect to their service.
# Leave the array empty to disable this feature.
SMARTPROXY_POOL = [
# "dc.smartproxy.com:10001",
# ...
]

View File

@ -1,35 +0,0 @@
import os
import signal
import psutil
def check_zombie(process):
try:
return process.status() == psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
def get_children_pids(pid):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
return [child.pid for child in children]
def zombie_slayer():
pid = os.getpid()
children_pids = get_children_pids(pid)
zombies = []
for child_pid in children_pids:
child = psutil.Process(child_pid)
if check_zombie(child):
zombies.append(child_pid)
if zombies:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.critical(f"Zombie processes detected: {zombies}")
logging.critical("Killing parent process to reap zombies...")
os.kill(pid, signal.SIGKILL)

View File

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .load_balancer import ProxyLoadBalancer
__all__ = [
'ProxyLoadBalancer',
]

View File

@ -1,262 +0,0 @@
import base64
import ipaddress
import logging
from typing import Any, Dict, List, Optional
from proxy.common.constants import (ANY_INTERFACE_HOSTNAMES, COLON, LOCAL_INTERFACE_HOSTNAMES)
from proxy.common.utils import text_
from proxy.core.base import TcpUpstreamConnectionHandler
from proxy.http import Url, httpHeaders, httpMethods
from proxy.http.exception import HttpProtocolException
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from redis import Redis
from ..config import SMARTPROXY_USER, SMARTPROXY_PASS, SMARTPROXY_POOL, BYPASS_SMARTPROXY_DOMAINS
from ..redis_cycle import redis_cycle
logger = logging.getLogger(__name__)
DEFAULT_HTTP_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {upstream_proxy_host}:{upstream_proxy_port} -> {server_host}:{server_port}{request_path} - ' + \
'{response_code} {response_reason} - {response_bytes} bytes - ' + \
'{connection_time_ms} ms'
DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {upstream_proxy_host}:{upstream_proxy_port} -> {server_host}:{server_port} - ' + \
'{response_bytes} bytes - {connection_time_ms} ms'
class ProxyLoadBalancer(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
"""Proxy pool plugin simply acts as a proxy adapter for proxy.py itself.
Imagine this plugin as setting up proxy settings for proxy.py instance itself.
All incoming client requests are proxied to configured upstream proxies."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.redis = Redis(host='localhost', port=6379, decode_responses=True)
self._endpoint: Url = Url()
self._metadata: List[Any] = [ # Cached attributes to be used during access log override
None, None, None, None,
]
def handle_upstream_data(self, raw: memoryview) -> None:
self.client.queue(raw)
def before_upstream_connection(
self, request: HttpParser,
) -> Optional[HttpParser]:
"""Avoids establishing the default connection to upstream server
by returning None.
TODO(abhinavsingh): Ideally connection to upstream proxy endpoints
must be bootstrapped within it's own re-usable and garbage collected pool,
to avoid establishing a new upstream proxy connection for each client request.
See :class:`~proxy.core.connection.pool.UpstreamConnectionPool` which is a work
in progress for SSL cache handling.
"""
# Select the proxy to use.
self._endpoint = self._select_proxy(request.host.decode(), request.has_header(b'smartproxy-bypass'), request.has_header(b'smartproxy-disable-bv3hi'))
request.del_header(b'smartproxy-bypass')
request.del_header(b'smartproxy-disable-bv3hi')
# We don't want to send private IP requests to remote proxies
try:
if ipaddress.ip_address(text_(request.host)).is_private:
return request
except ValueError:
pass
# If chosen proxy is the local instance, bypass upstream proxies
assert self._endpoint.port and self._endpoint.hostname
if self._endpoint.port == self.flags.port and \
self._endpoint.hostname in LOCAL_INTERFACE_HOSTNAMES + ANY_INTERFACE_HOSTNAMES:
return request
# Establish connection to chosen upstream proxy
endpoint_tuple = (text_(self._endpoint.hostname), self._endpoint.port)
logger.debug('Using endpoint: {0}:{1}'.format(*endpoint_tuple))
self.initialize_upstream(*endpoint_tuple)
assert self.upstream
try:
self.upstream.connect()
except TimeoutError:
raise HttpProtocolException(
'Timed out connecting to upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
except ConnectionRefusedError:
# TODO(abhinavsingh): Try another choice, when all (or max configured) choices have
# exhausted, retry for configured number of times before giving up.
# Failing upstream proxies, must be removed from the pool temporarily.
# A periodic health check must put them back in the pool. This can be achieved
# using a data structure without having to spawn separate thread/process for health
# check.
raise HttpProtocolException(
'Connection refused by upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
logger.debug(
'Established connection to upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
return None
def handle_client_request(
self, request: HttpParser,
) -> Optional[HttpParser]:
"""Only invoked once after client original proxy request has been received completely."""
if not self.upstream:
return request
assert self.upstream
# For log sanity (i.e. to avoid None:None), expose upstream host:port from headers
host, port = None, None
# Browser or applications may sometime send
#
# "CONNECT / HTTP/1.0\r\n\r\n"
#
# for proxy keep alive checks.
if request.has_header(b'host'):
url = Url.from_bytes(request.header(b'host'))
assert url.hostname
host, port = url.hostname.decode('utf-8'), url.port
port = port if port else (
443 if request.is_https_tunnel else 80
)
else:
# TODO: make sure this doesn't break anything
host = request.host.decode()
port = request.port
path = None if not request.path else request.path.decode()
self._metadata = [
host, port, path, request.method,
]
# Queue original request optionally with auth headers to upstream proxy
if self._endpoint.has_credentials:
assert self._endpoint.username and self._endpoint.password
request.add_header(
httpHeaders.PROXY_AUTHORIZATION,
b'Basic ' +
base64.b64encode(
self._endpoint.username +
COLON +
self._endpoint.password,
),
)
self.upstream.queue(memoryview(request.build(
for_proxy=True,
disable_headers=[
b'smartproxy-bypass',
b'smartproxy-disable-bv3hi'
]
)))
return request
def handle_client_data(self, raw: memoryview) -> Optional[memoryview]:
"""Only invoked when before_upstream_connection returns None"""
# Queue data to the proxy endpoint
assert self.upstream
self.upstream.queue(raw)
return raw
def handle_upstream_chunk(self, chunk: memoryview) -> Optional[memoryview]:
"""Will never be called since we didn't establish an upstream connection."""
if not self.upstream:
return chunk
raise Exception("This should have never been called")
def on_upstream_connection_close(self) -> None:
"""Called when client connection has been closed."""
if self.upstream and not self.upstream.closed:
logger.debug('Closing upstream proxy connection')
self.upstream.close()
self.upstream = None
def on_access_log(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not self.upstream:
return context
addr, port = (self.upstream.addr[0], self.upstream.addr[1]) \
if self.upstream else (None, None)
context.update({
'upstream_proxy_host': addr,
'upstream_proxy_port': port,
'server_host': self._metadata[0],
'server_port': self._metadata[1],
'request_path': self._metadata[2],
'response_bytes': self.total_size,
})
self.access_log(context)
return None
def access_log(self, log_attrs: Dict[str, Any]) -> None:
access_log_format = DEFAULT_HTTPS_ACCESS_LOG_FORMAT
request_method = self._metadata[3]
if request_method and request_method != httpMethods.CONNECT:
access_log_format = DEFAULT_HTTP_ACCESS_LOG_FORMAT
for attr, value in log_attrs.items():
if isinstance(value, bytes):
log_attrs[attr] = value.decode('utf-8')
logger.info(access_log_format.format_map(log_attrs))
def _select_proxy(self, request_host: str = None, smartproxy_bypass: bool = False, disable_smartproxy_bv3hi: bool = False) -> Url:
online = int(self.redis.get('balancer_online'))
if not online:
logger.error('Server is not online!')
return Url()
if disable_smartproxy_bv3hi and smartproxy_bypass:
# Prevent undefined behavior.
logger.error('Duplicate options headers detected. Rejecting request.')
return Url()
if not disable_smartproxy_bv3hi:
# The normal route.
if request_host in BYPASS_SMARTPROXY_DOMAINS or smartproxy_bypass:
valid_backends = redis_cycle('our_valid_proxies')
else:
valid_backends = redis_cycle('all_valid_proxies')
else:
valid_backends = redis_cycle('all_valid_proxies_with_broken_smartproxy')
if not len(valid_backends):
logger.error('No valid backends!')
return Url()
chosen_backend = valid_backends[0]
is_smartproxy = chosen_backend in SMARTPROXY_POOL
if not is_smartproxy:
return Url(
scheme='http'.encode(),
hostname=chosen_backend.split(':')[0].encode(),
port=int(chosen_backend.split(':')[1]),
)
else:
return Url(
scheme='http'.encode(),
username=SMARTPROXY_USER.encode(),
password=SMARTPROXY_PASS.encode(),
hostname=chosen_backend.split(':')[0].encode(),
port=int(chosen_backend.split(':')[1]),
)
# start_time = time.time()
# while not len(backends) and time.time() - start_time < 30: # wait a max of 30 seconds.
# time.sleep(1) # wait for 1 second before checking again
# backends = redis_cycle('all_valid_proxies')
# if not len(backends):
# logger.error('No available proxy after 30 seconds.')
# return Url()

View File

@ -1,39 +0,0 @@
import redis
redis_cycler_db = redis.Redis(host='localhost', port=6379, db=9)
def redis_cycle(list_name):
"""
Emulates itertools.cycle() but returns the complete shuffled list.
:param list_name:
:return:
"""
pipeline = redis_cycler_db.pipeline()
pipeline.lpop(list_name)
to_move = pipeline.execute()[0]
if not to_move:
return []
pipeline.rpush(list_name, to_move)
pipeline.lrange(list_name, 0, -1)
results = pipeline.execute()
new_list = results[-1]
return [x.decode('utf-8') for x in new_list]
def add_backend_cycler(list_name: str, new_elements: list):
existing_elements = [i.decode('utf-8') for i in redis_cycler_db.lrange(list_name, 0, -1)]
existing_set = set(existing_elements)
with redis_cycler_db.pipeline() as pipe:
# Add elements
for element in new_elements:
if element not in existing_set:
pipe.rpush(list_name, element)
# Remove elements
for element in existing_set:
if element not in new_elements:
pipe.lrem(list_name, 0, element)
pipe.execute()

View File

@ -1,5 +0,0 @@
from .config import SMARTPROXY_USER, SMARTPROXY_PASS
def transform_smartproxy(pxy_addr: str):
return f"http://{SMARTPROXY_USER}:{SMARTPROXY_PASS}@{pxy_addr}"