This commit is contained in:
Cyberes 2024-01-14 14:35:42 -07:00
parent 5666c2ef26
commit 3263194b52
19 changed files with 641 additions and 0 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
.idea/
config.py
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/

1
canihazip/README.md Normal file
View File

@ -0,0 +1 @@
A simple server to give the client his IP address. Used for the proxy load balancer.

View File

@ -0,0 +1,13 @@
[Unit]
Description=canihazip
After=network.target
[Service]
User=flask
Group=nogroup
WorkingDirectory=/srv/canihazip
ExecStart=/srv/canihazip/venv/bin/gunicorn --workers 3 --bind 0.0.0.0:5000 server:app --timeout 120
Restart=always
[Install]
WantedBy=multi-user.target

14
canihazip/server.py Normal file
View File

@ -0,0 +1,14 @@
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def get_my_ip(path):
return request.remote_addr, 200
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)

13
loadbalancer.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=Proxy Load Distributor
After=network.target
[Service]
User=loadbalancer
Group=loadbalancer
WorkingDirectory=/srv/loadbalancer/huggingface-proxy/distributor-server/proxy-skeleton
ExecStart=/srv/loadbalancer/huggingface-proxy/distributor-server/proxy-skeleton/venv/bin/python -m app
Restart=always
[Install]
WantedBy=multi-user.target

45
proxy-skeleton/README.md Normal file
View File

@ -0,0 +1,45 @@
# Skeleton App
This directory contains a sample standalone application structure which uses `proxy.py`
via `requirements.txt` file.
## Setup
```console
$ git clone https://github.com/abhinavsingh/proxy.py.git
$ cd proxy.py/skeleton
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt
```
## Run It
`python -m app`
Start your app and make a web request to `/` and a proxy request via the instance. You will
see log lines like this:
```console
$ python -m app
...[redacted]... - Loaded plugin proxy.http.proxy.HttpProxyPlugin
...[redacted]... - Loaded plugin proxy.http.server.HttpWebServerPlugin
...[redacted]... - Loaded plugin app.plugins.MyWebServerPlugin
...[redacted]... - Loaded plugin app.plugins.MyProxyPlugin
...[redacted]... - Listening on 127.0.0.1:9000
...[redacted]... - Started 16 acceptors in threadless (local) mode
...[redacted]... - HttpProtocolException: HttpRequestRejected b"I'm a tea pot"
...[redacted]... - 127.0.0.1:64601 - GET None:None/get - None None - 0 bytes - 0.64ms
...[redacted]... - 127.0.0.1:64622 - GET / - curl/7.77.0 - 0.95ms
```
Voila!!!
That is your custom app skeleton structure built on top of `proxy.py`. Now copy the `app` directory
outside of `proxy.py` repo and create your own git repo. Customize the `app` for your project needs
## Run in detached (background) mode
1. For one-off use cases, you can directly use the following command to start the app in background:
`python -m app 2>&1 &`
2. For production usage, you likely want a process control manager e.g. supervisord, systemd etc

View File

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .app import entry_point
__all__ = [
'entry_point',
]

View File

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from app import entry_point
if __name__ == '__main__':
entry_point()

56
proxy-skeleton/app/app.py Normal file
View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import ipaddress
import logging
import threading
import time
import coloredlogs
from proxy import proxy
from redis import Redis
from . import suicide
from .background import validate_proxies
coloredlogs.install(level='INFO')
def entry_point() -> None:
logger = logging.getLogger(__name__)
redis = Redis(host='localhost', port=6379, decode_responses=True)
redis.flushall()
redis.set('balancer_online', 0)
redis.set('suicide_online', 0)
threading.Thread(target=validate_proxies, daemon=True).start()
time.sleep(5)
while not int(redis.get('balancer_online')):
logger.warning('Waiting for background thread to populate proxies...')
time.sleep(5)
# suicide.SUICIDE_PACT.pact = threading.Thread(target=suicide.check_url_thread, args=("http://127.0.0.1:9000",), daemon=True)
# suicide.SUICIDE_PACT.pact.start()
with proxy.Proxy(
enable_web_server=True,
port=9000,
timeout=300,
hostname=ipaddress.IPv4Address('0.0.0.0'),
# NOTE: Pass plugins via *args if you define custom flags.
# Currently plugins passed via **kwargs are not discovered for
# custom flags by proxy.py
#
# See https://github.com/abhinavsingh/proxy.py/issues/871
plugins=[
'app.plugins.ProxyLoadBalancer',
],
) as _:
proxy.sleep_loop()

View File

@ -0,0 +1,82 @@
import concurrent
import logging
import random
import time
import requests
from redis import Redis
from .config import PROXY_POOL, SMARTPROXY_POOL, IP_CHECKER, MAX_PROXY_CHECKERS
from .redis_cycle import add_backend_cycler
from .smartproxy import transform_smartproxy
def validate_proxies():
"""
Validate proxies by sending a request to https://api.ipify.org and checking the resulting IP address.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
redis = Redis(host='localhost', port=6379, decode_responses=True)
logger.info('Doing inital backend check, please wait...')
started = False
while True:
our_online_backends = {}
smartproxy_online_backends = {}
ip_addresses = set()
def check_proxy(pxy):
try:
smartproxy = False
if pxy in SMARTPROXY_POOL:
smartproxy = True
r = requests.get(IP_CHECKER, proxies={'http': transform_smartproxy(pxy), 'https': transform_smartproxy(pxy)}, timeout=15)
# r_test = requests.get(TEST_LARGE_FILE, proxies={'http': transform_smartproxy(pxy), 'https': transform_smartproxy(pxy)}, timeout=15)
else:
r = requests.get(IP_CHECKER, proxies={'http': pxy, 'https': pxy}, timeout=15)
# r_test = requests.get(TEST_LARGE_FILE, proxies={'http': pxy, 'https': pxy}, timeout=15)
if r.status_code != 200:
logger.debug(f'PROXY TEST failed - {pxy} - got code {r.status_code}')
return
# if r_test.status_code != 200:
# logger.debug(f'PROXY TEST failed - {pxy} - test download got code {r_test.status_code}')
# return
ip = r.text
if ip not in ip_addresses:
proxy_dict = our_online_backends if not smartproxy else smartproxy_online_backends
ip_addresses.add(ip)
proxy_dict[pxy] = ip
else:
s = ' Smartproxy ' if smartproxy else ' '
logger.debug(f'Duplicate{s}IP: {ip}')
except Exception as e:
logger.debug(f'PROXY TEST failed - {pxy} - {e}') # ': {e.__class__.__name__}')
# traceback.print_exc()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PROXY_CHECKERS) as executor:
executor.map(check_proxy, set(PROXY_POOL) | set(SMARTPROXY_POOL))
our_valid_proxies = list(our_online_backends.keys())
smartproxy_valid_proxies = list(smartproxy_online_backends.keys())
valid_proxies = list(set(our_valid_proxies) | set(smartproxy_valid_proxies))
if not started:
random.shuffle(valid_proxies)
random.shuffle(valid_proxies)
started = True
add_backend_cycler('proxy_backends', valid_proxies)
if logger.level == logging.DEBUG:
logger.debug(f'Our Backends Online ({len(our_valid_proxies)}): {our_online_backends}')
logger.debug(f'Smartproxy Backends Online ({len(smartproxy_valid_proxies)}): {smartproxy_valid_proxies}')
else:
logger.info(f'Our Backends Online: {len(our_valid_proxies)}, Smartproxy Backends Online: {len(smartproxy_valid_proxies)}, Total: {len(our_valid_proxies) + len(smartproxy_valid_proxies)}')
redis.set('balancer_online', 1)
time.sleep(10)
# if int(redis.get('suicide_online')) == 1 and not suicide.SUICIDE_PACT.pact.is_alive():
# logger.critical('Suicide thread not running!')
# os.kill(os.getpid(), signal.SIGTERM)

View File

@ -0,0 +1,16 @@
IP_CHECKER = 'http://[your IP or domain name]:5000'
MAX_PROXY_CHECKERS = 50
SMARTPROXY_USER = 'example'
SMARTPROXY_PASS = 'password'
PROXY_POOL = [
"127.0.0.1:3128",
"[other server]:3128",
]
SMARTPROXY_POOL = [
# "dc.smartproxy.com:10001",
# ...
]

32
proxy-skeleton/app/pid.py Normal file
View File

@ -0,0 +1,32 @@
import os
import signal
import psutil
def check_zombie(process):
try:
return process.status() == psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
def get_children_pids(pid):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
return [child.pid for child in children]
def zombie_slayer():
pid = os.getpid()
children_pids = get_children_pids(pid)
zombies = []
for child_pid in children_pids:
child = psutil.Process(child_pid)
if check_zombie(child):
zombies.append(child_pid)
if zombies:
print(f"Zombie processes detected: {zombies}")
print("Killing parent process to reap zombies...")
os.kill(pid, signal.SIGKILL)

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .load_balancer import ProxyLoadBalancer
__all__ = [
'ProxyLoadBalancer',
]

View File

@ -0,0 +1,233 @@
import base64
import ipaddress
import logging
from typing import Any, Dict, List, Optional
from proxy.common.constants import (ANY_INTERFACE_HOSTNAMES, COLON, LOCAL_INTERFACE_HOSTNAMES)
from proxy.common.utils import text_
from proxy.core.base import TcpUpstreamConnectionHandler
from proxy.http import Url, httpHeaders, httpMethods
from proxy.http.exception import HttpProtocolException
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from redis import Redis
from ..config import SMARTPROXY_USER, SMARTPROXY_PASS, SMARTPROXY_POOL
from ..redis_cycle import redis_cycle
logger = logging.getLogger(__name__)
DEFAULT_HTTP_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {upstream_proxy_host}:{upstream_proxy_port} -> {server_host}:{server_port}{request_path} - ' + \
'{response_code} {response_reason} - {response_bytes} bytes - ' + \
'{connection_time_ms} ms'
DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
'{request_method} {upstream_proxy_host}:{upstream_proxy_port} -> {server_host}:{server_port} - ' + \
'{response_bytes} bytes - {connection_time_ms} ms'
class ProxyLoadBalancer(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
"""Proxy pool plugin simply acts as a proxy adapter for proxy.py itself.
Imagine this plugin as setting up proxy settings for proxy.py instance itself.
All incoming client requests are proxied to configured upstream proxies."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.redis = Redis(host='localhost', port=6379, decode_responses=True)
self._endpoint: Url = self._select_proxy()
# Cached attributes to be used during access log override
self._metadata: List[Any] = [
None, None, None, None,
]
def handle_upstream_data(self, raw: memoryview) -> None:
self.client.queue(raw)
def before_upstream_connection(
self, request: HttpParser,
) -> Optional[HttpParser]:
"""Avoids establishing the default connection to upstream server
by returning None.
TODO(abhinavsingh): Ideally connection to upstream proxy endpoints
must be bootstrapped within it's own re-usable and garbage collected pool,
to avoid establishing a new upstream proxy connection for each client request.
See :class:`~proxy.core.connection.pool.UpstreamConnectionPool` which is a work
in progress for SSL cache handling.
"""
# We don't want to send private IP requests to remote proxies
try:
if ipaddress.ip_address(text_(request.host)).is_private:
return request
except ValueError:
pass
# If chosen proxy is the local instance, bypass upstream proxies
assert self._endpoint.port and self._endpoint.hostname
if self._endpoint.port == self.flags.port and \
self._endpoint.hostname in LOCAL_INTERFACE_HOSTNAMES + ANY_INTERFACE_HOSTNAMES:
return request
# Establish connection to chosen upstream proxy
endpoint_tuple = (text_(self._endpoint.hostname), self._endpoint.port)
logger.debug('Using endpoint: {0}:{1}'.format(*endpoint_tuple))
self.initialize_upstream(*endpoint_tuple)
assert self.upstream
try:
self.upstream.connect()
except TimeoutError:
raise HttpProtocolException(
'Timed out connecting to upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
except ConnectionRefusedError:
# TODO(abhinavsingh): Try another choice, when all (or max configured) choices have
# exhausted, retry for configured number of times before giving up.
#
# Failing upstream proxies, must be removed from the pool temporarily.
# A periodic health check must put them back in the pool. This can be achieved
# using a data structure without having to spawn separate thread/process for health
# check.
raise HttpProtocolException(
'Connection refused by upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
logger.debug(
'Established connection to upstream proxy {0}:{1}'.format(
*endpoint_tuple,
),
)
return None
def handle_client_request(
self, request: HttpParser,
) -> Optional[HttpParser]:
"""Only invoked once after client original proxy request has been received completely."""
if not self.upstream:
return request
assert self.upstream
# For log sanity (i.e. to avoid None:None), expose upstream host:port from headers
host, port = None, None
# Browser or applications may sometime send
#
# "CONNECT / HTTP/1.0\r\n\r\n"
#
# for proxy keep alive checks.
if request.has_header(b'host'):
url = Url.from_bytes(request.header(b'host'))
assert url.hostname
host, port = url.hostname.decode('utf-8'), url.port
port = port if port else (
443 if request.is_https_tunnel else 80
)
else:
# TODO: make sure this doesn't break anything
host = request.host.decode()
port = request.port
path = None if not request.path else request.path.decode()
self._metadata = [
host, port, path, request.method,
]
# Queue original request optionally with auth headers to upstream proxy
if self._endpoint.has_credentials:
assert self._endpoint.username and self._endpoint.password
request.add_header(
httpHeaders.PROXY_AUTHORIZATION,
b'Basic ' +
base64.b64encode(
self._endpoint.username +
COLON +
self._endpoint.password,
),
)
self.upstream.queue(memoryview(request.build(for_proxy=True)))
return request
def handle_client_data(self, raw: memoryview) -> Optional[memoryview]:
"""Only invoked when before_upstream_connection returns None"""
# Queue data to the proxy endpoint
assert self.upstream
self.upstream.queue(raw)
return raw
def handle_upstream_chunk(self, chunk: memoryview) -> Optional[memoryview]:
"""Will never be called since we didn't establish an upstream connection."""
if not self.upstream:
return chunk
raise Exception("This should have never been called")
def on_upstream_connection_close(self) -> None:
"""Called when client connection has been closed."""
if self.upstream and not self.upstream.closed:
logger.debug('Closing upstream proxy connection')
self.upstream.close()
self.upstream = None
def on_access_log(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not self.upstream:
return context
addr, port = (self.upstream.addr[0], self.upstream.addr[1]) \
if self.upstream else (None, None)
context.update({
'upstream_proxy_host': addr,
'upstream_proxy_port': port,
'server_host': self._metadata[0],
'server_port': self._metadata[1],
'request_path': self._metadata[2],
'response_bytes': self.total_size,
})
self.access_log(context)
return None
def access_log(self, log_attrs: Dict[str, Any]) -> None:
access_log_format = DEFAULT_HTTPS_ACCESS_LOG_FORMAT
request_method = self._metadata[3]
if request_method and request_method != httpMethods.CONNECT:
access_log_format = DEFAULT_HTTP_ACCESS_LOG_FORMAT
for attr, value in log_attrs.items():
if isinstance(value, bytes):
log_attrs[attr] = value.decode('utf-8')
logger.info(access_log_format.format_map(log_attrs))
def _select_proxy(self) -> Url:
online = int(self.redis.get('balancer_online'))
if not online:
logger.error('Server is not online!')
return Url()
valid_backends = redis_cycle('proxy_backends')
if not len(valid_backends):
logger.error('No valid backends!')
return Url()
chosen_backend = valid_backends[0]
is_smartproxy = chosen_backend in SMARTPROXY_POOL
if not is_smartproxy:
return Url(
scheme='http'.encode(),
hostname=chosen_backend.split(':')[0].encode(),
port=int(chosen_backend.split(':')[1]),
)
else:
return Url(
scheme='http'.encode(),
username=SMARTPROXY_USER.encode(),
password=SMARTPROXY_PASS.encode(),
hostname=chosen_backend.split(':')[0].encode(),
port=int(chosen_backend.split(':')[1]),
)
# start_time = time.time()
# while not len(backends) and time.time() - start_time < 30: # wait a max of 30 seconds.
# time.sleep(1) # wait for 1 second before checking again
# backends = redis_cycle('proxy_backends')
# if not len(backends):
# logger.error('No available proxy after 30 seconds.')
# return Url()

View File

@ -0,0 +1,39 @@
import redis
redis_cycler_db = redis.Redis(host='localhost', port=6379, db=9)
def redis_cycle(list_name):
"""
Emulates itertools.cycle() but returns the complete shuffled list.
:param list_name:
:return:
"""
pipeline = redis_cycler_db.pipeline()
pipeline.lpop(list_name)
to_move = pipeline.execute()[0]
if not to_move:
return []
pipeline.rpush(list_name, to_move)
pipeline.lrange(list_name, 0, -1)
results = pipeline.execute()
new_list = results[-1]
return [x.decode('utf-8') for x in new_list]
def add_backend_cycler(list_name: str, new_elements: list):
existing_elements = [i.decode('utf-8') for i in redis_cycler_db.lrange(list_name, 0, -1)]
existing_set = set(existing_elements)
with redis_cycler_db.pipeline() as pipe:
# Add elements
for element in new_elements:
if element not in existing_set:
pipe.rpush(list_name, element)
# Remove elements
for element in existing_set:
if element not in new_elements:
pipe.lrem(list_name, 0, element)
pipe.execute()

View File

@ -0,0 +1,5 @@
from .config import SMARTPROXY_USER, SMARTPROXY_PASS
def transform_smartproxy(pxy_addr: str):
return f"http://{SMARTPROXY_USER}:{SMARTPROXY_PASS}@{pxy_addr}"

View File

@ -0,0 +1,35 @@
import logging
import os
import signal
import threading
import time
import requests
from redis import Redis
def check_url_thread(url: str):
redis = Redis(host='localhost', port=6379, decode_responses=True)
redis.set('suicide_online', 1)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
time.sleep(30) # give the server some time to start up
logger.info('Created a suicide pact.')
while True:
try:
response = requests.get(url, timeout=10)
if response.status_code != 404:
logger.critical(f"Fetch failed with status code: {response.status_code}")
os.kill(os.getpid(), signal.SIGTERM)
except requests.exceptions.RequestException as e:
logger.critical(f"Fetch failed with exception: {e}")
os.kill(os.getpid(), signal.SIGTERM)
time.sleep(10)
class SuicidePact:
def __init__(self):
self.pact = threading.Thread()
SUICIDE_PACT = SuicidePact()

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
proxy.py @ git+https://github.com/abhinavsingh/proxy.py.git@develop
redis
requests
async_timeout
coloredlogs
psutil
flask