Compare commits

...

8 Commits

10 changed files with 193 additions and 67 deletions

View File

@ -2,12 +2,11 @@
_A round-robin load balancer for HTTP proxies._
This is a simple proxy load balancer that will route requests to a cluster of proxy backends in a round-robin fashion.
This makes it easy to connect your clients to a large number of proxy servers without worrying about implementing
anything special clientside.
This is a simple proxy load balancer that will route requests to a cluster of proxy backends in a round-robin fashion. This makes it easy to connect your clients to a large number of proxy servers without worrying about implementing anything special clientside.
- Downstream HTTPS proxy servers are not supported.
- This proxy server will transparently forward HTTPS requests without terminating them, meaning a self-signed certificate is not required.
This proxy server will transparently forward HTTPS requests without terminating them, meaning a self-signed certificate is not required. Downstream HTTPS proxy servers are not supported.
Memory usage sits at around 25M under load.
## Install
@ -22,21 +21,22 @@ You can run your own "public IP delivery server" `canihazip` <https://git.evulid
An example systemd service `loadbalancer.service` is provided.
The server displays stats and info at `/json`
The server displays health, stats, info at `/json`.
```
=== Proxy Load Balancer ===
Usage of ./proxy-loadbalancer:
--config [string]
Path to the config file
--d, --debug
Enable debug mode
--v Print version and exit
Path to the config file
-d, --debug
Enable debug mode
--v Print version and exit
-h, --help Print this help message
```
## Special Headers
The load balancer accepts special headers to control its behavior.
The load balancer accepts special headers to control its behavior:
- `Thirdparty-Bypass`: don't use any third-party endpoints for this request.
- `Thirdparty-Include-Broken`: use all online endpoints for this request, including third-party ones that failed the special test.

View File

@ -1,14 +1,17 @@
# Port to run on.
http_port: 9000
http_port: 9000
# How many proxies will be checked at once?
proxy_checkers: 50
proxy_checkers: 50
# The interval between proxy checks in seconds.
proxy_check_interval: 60
# URL to get a proxy's IP.
ip_checker_url: https://api.ipify.org
ip_checker_url: https://api.ipify.org
# Connection timeout for the proxies in seconds.
proxy_connect_timeout: 10
proxy_connect_timeout: 60
# Your proxies.
proxy_pool_ours:
@ -27,4 +30,17 @@ thirdparty_test_urls:
# Don't route requests for these domains through the third-party proxies.
thirdparty_bypass_domains:
- twitter.com
- twitter.com
# Shuffle the proxy lists whenever the background thread refreshes them.
# If false, round-robin on default order.
shuffle_proxies: false
# Don't allow requests to these domains through the proxy.
blocked_domains:
- example.com
# Resolve specific domains through specific proxies.
# Proxies here are not validated.
resolve_through:
github.com: http://1.2.3.4:3128

View File

@ -1,7 +0,0 @@
proxy.py @ git+https://github.com/abhinavsingh/proxy.py.git@develop
redis
requests
coloredlogs
psutil
flask
async_timeout

View File

@ -18,6 +18,10 @@ type Config struct {
ProxyPoolThirdparty []string
ThirdpartyTestUrls []string
ThirdpartyBypassDomains []string
ShuffleProxies bool
BlockedDomains []string
ResolveThrough map[string]string
ProxyCheckInterval int
}
func SetConfig(configFile string) (*Config, error) {
@ -29,11 +33,15 @@ func SetConfig(configFile string) (*Config, error) {
viper.SetConfigFile(configFile)
viper.SetDefault("http_port", "5000")
viper.SetDefault("proxy_checkers", 50)
viper.SetDefault("proxy_connect_timeout", 10)
viper.SetDefault("proxy_connect_timeout", 60)
viper.SetDefault("proxy_pool_ours", make([]string, 0))
viper.SetDefault("proxy_pool_thirdparty", make([]string, 0))
viper.SetDefault("thirdparty_test_urls", make([]string, 0))
viper.SetDefault("thirdparty_bypass_domains", make([]string, 0))
viper.SetDefault("shuffle_proxies", false)
viper.SetDefault("blocked_domains", make([]string, 0))
viper.SetDefault("resolve_through", make(map[string]string))
viper.SetDefault("proxy_check_interval", 60)
err := viper.ReadInConfig()
if err != nil {
@ -48,6 +56,14 @@ func SetConfig(configFile string) (*Config, error) {
ProxyPoolThirdparty: viper.GetStringSlice("proxy_pool_thirdparty"),
ThirdpartyTestUrls: viper.GetStringSlice("thirdparty_test_urls"),
ThirdpartyBypassDomains: viper.GetStringSlice("thirdparty_bypass_domains"),
ShuffleProxies: viper.GetBool("shuffle_proxies"),
BlockedDomains: viper.GetStringSlice("blocked_domains"),
ResolveThrough: viper.GetStringMapString("resolve_through"),
ProxyCheckInterval: viper.GetInt("proxy_check_interval"),
}
if len(config.ProxyPoolOurs) == 0 && len(config.ProxyPoolThirdparty) == 0 {
return nil, errors.New("no proxies configured")
}
if config.IpCheckerURL == "" {

View File

@ -82,6 +82,8 @@ func main() {
log.Fatalf(`Failed to load config: %s`, err)
}
log.Debugf(`Proxy check interval: %d sec`, config.GetConfig().ProxyCheckInterval)
proxyCluster := proxy.NewForwardProxyCluster()
go func() {
log.Fatal(http.ListenAndServe(":"+configData.HTTPPort, proxyCluster))
@ -89,7 +91,7 @@ func main() {
log.Infof("-> Server started on 0.0.0.0:%s <-", configData.HTTPPort)
go proxyCluster.ValidateProxiesThread()
proxyCluster.BalancerOnline.Wait()
proxyCluster.BalancerReady.Wait()
log.Infoln("-> Proxy server accepting requests <-")
select {}

View File

@ -25,17 +25,35 @@ func sendRequestThroughProxy(pxy string, targetURL string) (string, error) {
Timeout: config.GetConfig().ProxyConnectTimeout,
}
response, err := client.Get(targetURL)
req, err := http.NewRequest("GET", targetURL, nil)
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
req.Header.Set("Accept-Language", "en-US,en;q=0.9")
req.Header.Set("Priority", "u=0, i")
req.Header.Set("Sec-Ch-Ua", `"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"`)
req.Header.Set("Sec-Ch-Ua-Mobile", "?0")
req.Header.Set("Sec-Ch-Ua-Platform", `"Windows"`)
req.Header.Set("Sec-Fetch-Dest", "document")
req.Header.Set("Sec-Fetch-Mode", "navigate")
req.Header.Set("Sec-Fetch-Site", "cross-site")
req.Header.Set("Sec-Fetch-User", "?1")
req.Header.Set("Upgrade-Insecure-Requests", "1")
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(bodyBytes), nil
}
return "", fmt.Errorf("bad response code %d", response.StatusCode)
return "", fmt.Errorf("bad response code %d", resp.StatusCode)
}

View File

@ -19,12 +19,18 @@ var (
HeaderThirdpartyBypass = "Thirdparty-Bypass"
)
func logProxyRequest(remoteAddr string, proxyHost string, targetHost string, returnCode *int, proxyConnectMode string, requestStartTime time.Time) {
log.Infof(`%s -> %s -> %s -> %d -- %s -- %d ms`, remoteAddr, proxyHost, targetHost, *returnCode, proxyConnectMode, time.Since(requestStartTime).Milliseconds())
func logProxyRequest(remoteAddr string, proxyHost string, targetHost string, returnCode *int, proxyConnectMode string, requestStartTime time.Time, errorMsg *string) {
msg := fmt.Sprintf(`%s -> %s -> %s -> %d -- %s -- %d ms`, remoteAddr, proxyHost, targetHost, *returnCode, proxyConnectMode, time.Since(requestStartTime).Milliseconds())
if *errorMsg == "" {
log.Infof(msg)
} else {
log.Errorf(`%s -- %s`, msg, *errorMsg)
}
}
func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter, req *http.Request) (string, string, string, string, *url.URL, error) {
if p.BalancerOnline.GetCount() != 0 {
urlHostname := req.URL.Hostname()
if p.BalancerReady.GetCount() != 0 {
errStr := "proxy is not ready"
http.Error(w, errStr, http.StatusServiceUnavailable)
return "", "", "", "", nil, errors.New(errStr)
@ -39,6 +45,12 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
return "", "", "", "", nil, errors.New(errStr)
}
if slices.Contains(config.GetConfig().BlockedDomains, urlHostname) {
errStr := "this domain has been blocked"
http.Error(w, errStr, http.StatusUnavailableForLegalReasons)
return "", "", "", "", nil, errors.New(errStr)
}
headerIncludeBrokenThirdparty := req.Header.Get(HeaderThirdpartyIncludeBroken)
req.Header.Del(HeaderThirdpartyIncludeBroken)
headerBypassThirdparty := req.Header.Get(HeaderThirdpartyBypass)
@ -50,15 +62,20 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
}
var selectedProxy string
if slices.Contains(config.GetConfig().ThirdpartyBypassDomains, req.URL.Hostname()) {
selectedProxy = p.getProxyFromOurs()
if val, ok := config.GetConfig().ResolveThrough[urlHostname]; ok {
selectedProxy = val
} else {
if headerIncludeBrokenThirdparty != "" {
selectedProxy = p.getProxyFromAllWithBroken()
} else if headerBypassThirdparty != "" {
if slices.Contains(config.GetConfig().ThirdpartyBypassDomains, urlHostname) {
selectedProxy = p.getProxyFromOurs()
} else {
selectedProxy = p.getProxyFromAll()
if headerIncludeBrokenThirdparty != "" {
selectedProxy = p.getProxyFromAllWithBroken()
} else if headerBypassThirdparty != "" {
selectedProxy = p.getProxyFromOurs()
} else {
selectedProxy = p.getProxyFromAll()
}
}
}
if selectedProxy == "" {
@ -82,13 +99,21 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
_, proxyUser, proxyPass, proxyHost, parsedProxyUrl, err := p.validateRequestAndGetProxy(w, req)
if err != nil {
// Error has already been handled, just log and return.
log.Debugf(`%s -> %s -- HTTP -- Rejecting request: %s`, remoteAddr, proxyHost, err)
if proxyHost == "" {
proxyHost = "none"
}
log.Debugf(`%s -> %s -> %s -- HTTP -- Rejecting request: %s`, remoteAddr, proxyHost, req.Host, err)
return
}
// Variables for later
var returnCode *int
returnCode = new(int)
*returnCode = -1
defer logProxyRequest(remoteAddr, proxyHost, req.Host, returnCode, "HTTP", requestStartTime)
var errorMsg *string
errorMsg = new(string)
*errorMsg = ""
defer logProxyRequest(remoteAddr, proxyHost, req.Host, returnCode, "HTTP", requestStartTime, errorMsg)
parsedProxyUrl.Scheme = "http"
if proxyUser != "" && proxyPass != "" {
@ -103,7 +128,7 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body)
if err != nil {
log.Errorf(`Failed to make %s request to "%s": %s`, req.Method, req.URL.String(), err)
*errorMsg = fmt.Sprintf(`Failed to make %s request to "%s": %s`, req.Method, req.URL.String(), err)
http.Error(w, "failed to make request to downstream", http.StatusInternalServerError)
return
}
@ -113,7 +138,7 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
resp, err := client.Do(proxyReq)
if err != nil {
log.Errorf(`Failed to execute %s request to "%s": %s`, req.Method, req.URL.String(), err)
*errorMsg = fmt.Sprintf(`Failed to execute %s request to "%s": %s`, req.Method, req.URL.String(), err)
http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable)
return
}
@ -132,18 +157,24 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
_, proxyUser, proxyPass, proxyHost, _, err := p.validateRequestAndGetProxy(w, req)
if err != nil {
// Error has already been handled, just log and return.
log.Debugf(`%s -> %s -- CONNECT -- Rejecting request: %s`, remoteAddr, proxyHost, err)
if proxyHost == "" {
proxyHost = "none"
}
log.Debugf(`%s -> %s -> %s -- CONNECT -- Rejecting request: %s`, remoteAddr, proxyHost, targetHost, err)
return
}
var returnCode *int
returnCode = new(int)
*returnCode = -1
defer logProxyRequest(remoteAddr, proxyHost, targetHost, returnCode, "CONNECT", requestStartTime)
var errorMsg *string
errorMsg = new(string)
*errorMsg = ""
defer logProxyRequest(remoteAddr, proxyHost, targetHost, returnCode, "CONNECT", requestStartTime, errorMsg)
// Start a connection to the downstream proxy server.
proxyConn, err := net.DialTimeout("tcp", proxyHost, config.GetConfig().ProxyConnectTimeout)
if err != nil {
log.Errorf(`Failed to dial proxy %s - %s`, proxyHost, err)
*errorMsg = fmt.Sprintf(`Failed to dial proxy %s - %s`, proxyHost, err)
http.Error(w, "failed to make request to downstream", http.StatusServiceUnavailable)
return
}
@ -160,7 +191,7 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
}
resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req)
if resp == nil {
log.Errorf(`Failed to CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
*errorMsg = fmt.Sprintf(`Failed to CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable)
return
} else if err != nil {
@ -171,14 +202,14 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
w.WriteHeader(http.StatusOK)
hj, ok := w.(http.Hijacker)
if !ok {
log.Errorf(`Failed to forward connection to %s using proxy %s`, req.Host, proxyHost)
*errorMsg = fmt.Sprintf(`Failed to forward connection to %s using proxy %s`, req.Host, proxyHost)
http.Error(w, "failed to forward connection to downstream", http.StatusServiceUnavailable)
return
}
clientConn, _, err := hj.Hijack()
if err != nil {
log.Errorf(`Failed to execute connection forwarding to %s using proxy %s`, req.Host, proxyHost)
*errorMsg = fmt.Sprintf(`Failed to execute connection forwarding to %s using proxy %s`, req.Host, proxyHost)
http.Error(w, "failed to execute connection forwarding to downstream", http.StatusServiceUnavailable)
return
}

View File

@ -11,13 +11,17 @@ import (
type ForwardProxyCluster struct {
mu sync.RWMutex
ourOnlineProxies []string
ourOfflineProxies []string
thirdpartyOnlineProxies []string
thirdpartyBrokenProxies []string
thirdpartyOfflineProxies []string
ipAddresses []string
BalancerOnline WaitGroupCountable
BalancerReady WaitGroupCountable
BalancerOnline bool
currentProxyAll int32
currentProxyOurs int32
currentProxyAllWithBroken int32
refreshInProgress bool
}
var log *logrus.Logger
@ -31,7 +35,8 @@ func NewForwardProxyCluster() *ForwardProxyCluster {
atomic.StoreInt32(&p.currentProxyAll, 0)
atomic.StoreInt32(&p.currentProxyOurs, 0)
atomic.StoreInt32(&p.currentProxyAllWithBroken, 0)
p.BalancerOnline.Add(1)
p.BalancerReady.Add(1)
p.BalancerOnline = false
return p
}

View File

@ -34,14 +34,19 @@ func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request
} else if req.URL.Path == "/json" {
p.mu.RLock()
response := map[string]interface{}{
"uptime": int(math.Round(time.Since(startTime).Seconds())),
"online": p.BalancerOnline.GetCount() == 0,
"uptime": int(math.Round(time.Since(startTime).Seconds())),
"online": p.BalancerOnline && p.BalancerReady.GetCount() == 0,
"refreshInProgress": p.refreshInProgress,
"proxies": map[string]interface{}{
"totalOnline": len(p.ourOnlineProxies) + len(p.thirdpartyOnlineProxies),
"ours": removeCredentials(p.ourOnlineProxies),
"ours": map[string]interface{}{
"online": removeCredentials(p.ourOnlineProxies),
"offline": removeCredentials(p.ourOfflineProxies),
},
"thirdParty": map[string]interface{}{
"online": removeCredentials(p.thirdpartyOnlineProxies),
"broken": removeCredentials(p.thirdpartyBrokenProxies),
"online": removeCredentials(p.thirdpartyOnlineProxies),
"broken": removeCredentials(p.thirdpartyBrokenProxies),
"offline": removeCredentials(p.thirdpartyOfflineProxies),
},
},
}
@ -72,10 +77,14 @@ func removeCredentials(proxyURLs []string) []string {
for _, proxyURL := range proxyURLs {
u, err := url.Parse(proxyURL)
if err != nil {
return nil
// Skip if invalid.
continue
}
u.User = nil
newURLs = append(newURLs, u.String())
}
if len(newURLs) == 0 {
newURLs = make([]string, 0)
}
return newURLs
}

View File

@ -17,10 +17,14 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
ctx := context.TODO()
for {
startTime := time.Now()
p.refreshInProgress = true
allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...))
newOurOnlineProxies := make([]string, 0)
newOurOfflineProxies := make([]string, 0)
newThirdpartyOnlineProxies := make([]string, 0)
newThirdpartyBrokenProxies := make([]string, 0)
newThirdpartyOfflineProxies := make([]string, 0)
newIpAddresses := make([]string, 0)
var wg sync.WaitGroup
@ -31,6 +35,11 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorf("Validate - failed to acquire semaphore: %v", err)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
defer sem.Release(1)
@ -38,32 +47,51 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
_, _, proxyHost, _, err := splitProxyURL(pxy)
if err != nil {
log.Errorf(`Invalid proxy "%s"`, pxy)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
// Test the proxy.
ipAddr, testErr := sendRequestThroughProxy(pxy, config.GetConfig().IpCheckerURL)
if testErr != nil {
log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr)
log.Debugf("Validate - %s failed: %s", proxyHost, testErr)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
if slices.Contains(newIpAddresses, ipAddr) {
log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost)
log.Warnf("Validate - duplicate IP Address %s for %s", ipAddr, proxyHost)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
newIpAddresses = append(newIpAddresses, ipAddr)
// Sort the proxy into the right groups.
if isThirdparty(pxy) {
newThirdpartyOnlineProxies = append(newThirdpartyOnlineProxies, pxy)
okToAdd := true
for _, d := range config.GetConfig().ThirdpartyTestUrls {
_, bv3hiErr := sendRequestThroughProxy(pxy, d)
if bv3hiErr != nil {
log.Debugf("Validate - Third-party %s failed: %s", proxyHost, bv3hiErr)
okToAdd = false
newThirdpartyBrokenProxies = append(newThirdpartyBrokenProxies, pxy)
log.Debugf("Validate - %s failed third-party test: %s", proxyHost, bv3hiErr)
break
}
}
if okToAdd {
newThirdpartyOnlineProxies = append(newThirdpartyOnlineProxies, pxy)
}
} else {
newOurOnlineProxies = append(newOurOnlineProxies, pxy)
}
@ -73,26 +101,34 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
p.mu.Lock()
p.ourOnlineProxies = removeDuplicates(newOurOnlineProxies)
p.ourOfflineProxies = newOurOfflineProxies
p.thirdpartyOnlineProxies = removeDuplicates(newThirdpartyOnlineProxies)
p.thirdpartyBrokenProxies = removeDuplicates(newThirdpartyBrokenProxies)
p.thirdpartyOfflineProxies = newThirdpartyOfflineProxies
p.ipAddresses = removeDuplicates(newIpAddresses)
p.BalancerOnline = len(slices.Concat(p.ourOnlineProxies, p.thirdpartyOnlineProxies, p.thirdpartyBrokenProxies)) > 0 // Online only if there are active and online proxies.
p.mu.Unlock()
if !started {
if config.GetConfig().ShuffleProxies {
p.mu.Lock()
p.ourOnlineProxies = shuffle(p.ourOnlineProxies)
p.thirdpartyOnlineProxies = shuffle(p.thirdpartyOnlineProxies)
p.mu.Unlock()
}
if !started {
started = true
p.BalancerOnline.Done()
p.BalancerReady.Done()
}
p.mu.RLock()
log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d",
len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies)))
log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d, Elapsed: %s",
len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+len(p.thirdpartyOnlineProxies), time.Since(startTime),
)
p.mu.RUnlock()
time.Sleep(60 * time.Second)
p.refreshInProgress = false
time.Sleep(time.Duration(config.GetConfig().ProxyCheckInterval) * time.Second)
}
}