Compare commits

...

16 Commits

Author SHA1 Message Date
Cyberes e93f353860 delete old file 2024-05-14 17:54:36 -06:00
Cyberes c3e60eaca9 adjust logging and track how long the validation checks take 2024-05-14 17:41:47 -06:00
Cyberes 8a2ee75188 mimic chrome on windows headers when testing proxy 2024-05-14 16:59:23 -06:00
Cyberes a886056d7f add option to block domains, add option to resolve specific domains through specific proxies, option to set the check interval 2024-05-06 21:00:35 -06:00
Cyberes 2ce41d0637 fix issue with "total valid" showing up as negative 2024-04-30 18:15:35 -06:00
Cyberes ea89052ef0 fix issue with invalid third-party proxy ending up in the valid third-party list 2024-04-22 20:55:37 -06:00
Cyberes 68a444c17c update readme 2024-04-16 21:48:06 -06:00
Cyberes 3b66b63469 update readme, update config, require proxies to start, track offline proxies, move shuffle behind config, modify json response 2024-04-12 22:50:44 -06:00
Cyberes 9316c0c1bc update readme 2024-04-12 20:50:06 -06:00
Cyberes d10c84d974 don't check for duplicates when getting a proxy 2024-04-12 20:47:55 -06:00
Cyberes 5273d1b16f improve background thread resouce locking, adjust printing 2024-04-12 20:44:05 -06:00
Cyberes fb124561eb adjust logging 2024-04-12 20:36:25 -06:00
Cyberes dd1a880e41 fix exception from proxy server when background thread is updating proxies, return the proxied status code instead of handling it on the server,add more fields to request logging 2024-04-12 20:31:59 -06:00
Cyberes 8cddcbcb2c make request logging info level, add request logging to webserver, update service file 2024-04-12 19:31:28 -06:00
Cyberes 684d9c8adf log rejection request 2024-04-12 19:26:18 -06:00
Cyberes 341b82c2e4 remove proxy creds from json status 2024-04-12 19:21:11 -06:00
11 changed files with 266 additions and 105 deletions

View File

@ -2,11 +2,11 @@
_A round-robin load balancer for HTTP proxies._
This is a simple proxy load balancer that will route requests to a cluster of proxy backends in a round-robin fashion.
This makes it easy to connect your clients to a large number of proxy servers without worrying about implementing
anything special clientside.
This is a simple proxy load balancer that will route requests to a cluster of proxy backends in a round-robin fashion. This makes it easy to connect your clients to a large number of proxy servers without worrying about implementing anything special clientside.
HTTPS proxy servers are not supported.
This proxy server will transparently forward HTTPS requests without terminating them, meaning a self-signed certificate is not required. Downstream HTTPS proxy servers are not supported.
Memory usage sits at around 25M under load.
## Install
@ -21,11 +21,22 @@ You can run your own "public IP delivery server" `canihazip` <https://git.evulid
An example systemd service `loadbalancer.service` is provided.
The server displays stats and info at `/json`
The server displays health, stats, info at `/json`.
```
=== Proxy Load Balancer ===
Usage of ./proxy-loadbalancer:
--config [string]
Path to the config file
-d, --debug
Enable debug mode
--v Print version and exit
-h, --help Print this help message
```
## Special Headers
The load balancer accepts special headers to control its behavior.
The load balancer accepts special headers to control its behavior:
- `Thirdparty-Bypass`: don't use any third-party endpoints for this request.
- `Thirdparty-Include-Broken`: use all online endpoints for this request, including third-party ones that failed the special test.

View File

@ -1,14 +1,17 @@
# Port to run on.
http_port: 9000
http_port: 9000
# How many proxies will be checked at once?
proxy_checkers: 50
proxy_checkers: 50
# The interval between proxy checks in seconds.
proxy_check_interval: 60
# URL to get a proxy's IP.
ip_checker_url: https://api.ipify.org
ip_checker_url: https://api.ipify.org
# Connection timeout for the proxies in seconds.
proxy_connect_timeout: 10
proxy_connect_timeout: 60
# Your proxies.
proxy_pool_ours:
@ -27,4 +30,17 @@ thirdparty_test_urls:
# Don't route requests for these domains through the third-party proxies.
thirdparty_bypass_domains:
- twitter.com
- twitter.com
# Shuffle the proxy lists whenever the background thread refreshes them.
# If false, round-robin on default order.
shuffle_proxies: false
# Don't allow requests to these domains through the proxy.
blocked_domains:
- example.com
# Resolve specific domains through specific proxies.
# Proxies here are not validated.
resolve_through:
github.com: http://1.2.3.4:3128

View File

@ -6,7 +6,7 @@ After=network.target
SyslogIdentifier=proxy-loadbalancer
User=loadbalancer
Group=loadbalancer
ExecStart=/srv/loadbalancer/proxy-loadbalancer --config /etc/proxy-loadbalancer/config.yml
ExecStart=/srv/loadbalancer/proxy-loadbalancer -d --config /etc/proxy-loadbalancer/config.yml
Restart=always
[Install]

View File

@ -1,7 +0,0 @@
proxy.py @ git+https://github.com/abhinavsingh/proxy.py.git@develop
redis
requests
coloredlogs
psutil
flask
async_timeout

View File

@ -18,6 +18,10 @@ type Config struct {
ProxyPoolThirdparty []string
ThirdpartyTestUrls []string
ThirdpartyBypassDomains []string
ShuffleProxies bool
BlockedDomains []string
ResolveThrough map[string]string
ProxyCheckInterval int
}
func SetConfig(configFile string) (*Config, error) {
@ -29,11 +33,15 @@ func SetConfig(configFile string) (*Config, error) {
viper.SetConfigFile(configFile)
viper.SetDefault("http_port", "5000")
viper.SetDefault("proxy_checkers", 50)
viper.SetDefault("proxy_connect_timeout", 10)
viper.SetDefault("proxy_connect_timeout", 60)
viper.SetDefault("proxy_pool_ours", make([]string, 0))
viper.SetDefault("proxy_pool_thirdparty", make([]string, 0))
viper.SetDefault("thirdparty_test_urls", make([]string, 0))
viper.SetDefault("thirdparty_bypass_domains", make([]string, 0))
viper.SetDefault("shuffle_proxies", false)
viper.SetDefault("blocked_domains", make([]string, 0))
viper.SetDefault("resolve_through", make(map[string]string))
viper.SetDefault("proxy_check_interval", 60)
err := viper.ReadInConfig()
if err != nil {
@ -48,6 +56,14 @@ func SetConfig(configFile string) (*Config, error) {
ProxyPoolThirdparty: viper.GetStringSlice("proxy_pool_thirdparty"),
ThirdpartyTestUrls: viper.GetStringSlice("thirdparty_test_urls"),
ThirdpartyBypassDomains: viper.GetStringSlice("thirdparty_bypass_domains"),
ShuffleProxies: viper.GetBool("shuffle_proxies"),
BlockedDomains: viper.GetStringSlice("blocked_domains"),
ResolveThrough: viper.GetStringMapString("resolve_through"),
ProxyCheckInterval: viper.GetInt("proxy_check_interval"),
}
if len(config.ProxyPoolOurs) == 0 && len(config.ProxyPoolThirdparty) == 0 {
return nil, errors.New("no proxies configured")
}
if config.IpCheckerURL == "" {

View File

@ -82,6 +82,8 @@ func main() {
log.Fatalf(`Failed to load config: %s`, err)
}
log.Debugf(`Proxy check interval: %d sec`, config.GetConfig().ProxyCheckInterval)
proxyCluster := proxy.NewForwardProxyCluster()
go func() {
log.Fatal(http.ListenAndServe(":"+configData.HTTPPort, proxyCluster))
@ -89,7 +91,7 @@ func main() {
log.Infof("-> Server started on 0.0.0.0:%s <-", configData.HTTPPort)
go proxyCluster.ValidateProxiesThread()
proxyCluster.BalancerOnline.Wait()
proxyCluster.BalancerReady.Wait()
log.Infoln("-> Proxy server accepting requests <-")
select {}

View File

@ -25,17 +25,35 @@ func sendRequestThroughProxy(pxy string, targetURL string) (string, error) {
Timeout: config.GetConfig().ProxyConnectTimeout,
}
response, err := client.Get(targetURL)
req, err := http.NewRequest("GET", targetURL, nil)
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
req.Header.Set("Accept-Language", "en-US,en;q=0.9")
req.Header.Set("Priority", "u=0, i")
req.Header.Set("Sec-Ch-Ua", `"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"`)
req.Header.Set("Sec-Ch-Ua-Mobile", "?0")
req.Header.Set("Sec-Ch-Ua-Platform", `"Windows"`)
req.Header.Set("Sec-Fetch-Dest", "document")
req.Header.Set("Sec-Fetch-Mode", "navigate")
req.Header.Set("Sec-Fetch-Site", "cross-site")
req.Header.Set("Sec-Fetch-User", "?1")
req.Header.Set("Upgrade-Insecure-Requests", "1")
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(bodyBytes), nil
}
return "", fmt.Errorf("bad response code %d", response.StatusCode)
return "", fmt.Errorf("bad response code %d", resp.StatusCode)
}

View File

@ -11,6 +11,7 @@ import (
"net/http"
"net/url"
"slices"
"time"
)
var (
@ -18,9 +19,19 @@ var (
HeaderThirdpartyBypass = "Thirdparty-Bypass"
)
func logProxyRequest(remoteAddr string, proxyHost string, targetHost string, returnCode *int, proxyConnectMode string, requestStartTime time.Time, errorMsg *string) {
msg := fmt.Sprintf(`%s -> %s -> %s -> %d -- %s -- %d ms`, remoteAddr, proxyHost, targetHost, *returnCode, proxyConnectMode, time.Since(requestStartTime).Milliseconds())
if *errorMsg == "" {
log.Infof(msg)
} else {
log.Errorf(`%s -- %s`, msg, *errorMsg)
}
}
func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter, req *http.Request) (string, string, string, string, *url.URL, error) {
if p.BalancerOnline.GetCount() != 0 {
errStr := "balancer is not ready"
urlHostname := req.URL.Hostname()
if p.BalancerReady.GetCount() != 0 {
errStr := "proxy is not ready"
http.Error(w, errStr, http.StatusServiceUnavailable)
return "", "", "", "", nil, errors.New(errStr)
}
@ -34,6 +45,12 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
return "", "", "", "", nil, errors.New(errStr)
}
if slices.Contains(config.GetConfig().BlockedDomains, urlHostname) {
errStr := "this domain has been blocked"
http.Error(w, errStr, http.StatusUnavailableForLegalReasons)
return "", "", "", "", nil, errors.New(errStr)
}
headerIncludeBrokenThirdparty := req.Header.Get(HeaderThirdpartyIncludeBroken)
req.Header.Del(HeaderThirdpartyIncludeBroken)
headerBypassThirdparty := req.Header.Get(HeaderThirdpartyBypass)
@ -45,15 +62,20 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
}
var selectedProxy string
if slices.Contains(config.GetConfig().ThirdpartyBypassDomains, req.URL.Hostname()) {
selectedProxy = p.getProxyFromOurs()
if val, ok := config.GetConfig().ResolveThrough[urlHostname]; ok {
selectedProxy = val
} else {
if headerIncludeBrokenThirdparty != "" {
selectedProxy = p.getProxyFromAllWithBroken()
} else if headerBypassThirdparty != "" {
if slices.Contains(config.GetConfig().ThirdpartyBypassDomains, urlHostname) {
selectedProxy = p.getProxyFromOurs()
} else {
selectedProxy = p.getProxyFromAll()
if headerIncludeBrokenThirdparty != "" {
selectedProxy = p.getProxyFromAllWithBroken()
} else if headerBypassThirdparty != "" {
selectedProxy = p.getProxyFromOurs()
} else {
selectedProxy = p.getProxyFromAll()
}
}
}
if selectedProxy == "" {
@ -72,14 +94,26 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
}
func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.Request) {
requestStartTime := time.Now()
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
_, proxyUser, proxyPass, proxyHost, parsedProxyUrl, err := p.validateRequestAndGetProxy(w, req)
if err != nil {
// Error has already been handled, just log and return.
log.Errorf(`Failed to validate and get proxy: "%s"`, err)
if proxyHost == "" {
proxyHost = "none"
}
log.Debugf(`%s -> %s -> %s -- HTTP -- Rejecting request: %s`, remoteAddr, proxyHost, req.Host, err)
return
}
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
defer log.Debugf(`%s -> %s -> %s -- HTTP`, remoteAddr, proxyHost, req.Host)
// Variables for later
var returnCode *int
returnCode = new(int)
*returnCode = -1
var errorMsg *string
errorMsg = new(string)
*errorMsg = ""
defer logProxyRequest(remoteAddr, proxyHost, req.Host, returnCode, "HTTP", requestStartTime, errorMsg)
parsedProxyUrl.Scheme = "http"
if proxyUser != "" && proxyPass != "" {
@ -94,7 +128,7 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body)
if err != nil {
log.Errorf(`Failed to make %s request to "%s": "%s"`, req.Method, req.URL.String(), err)
*errorMsg = fmt.Sprintf(`Failed to make %s request to "%s": %s`, req.Method, req.URL.String(), err)
http.Error(w, "failed to make request to downstream", http.StatusInternalServerError)
return
}
@ -104,11 +138,12 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
resp, err := client.Do(proxyReq)
if err != nil {
log.Errorf(`Failed to execute %s request to "%s": "%s"`, req.Method, req.URL.String(), err)
*errorMsg = fmt.Sprintf(`Failed to execute %s request to "%s": %s`, req.Method, req.URL.String(), err)
http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
*returnCode = resp.StatusCode
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
@ -116,20 +151,30 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
}
func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http.Request) {
requestStartTime := time.Now()
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
targetHost, _, _ := net.SplitHostPort(req.Host)
_, proxyUser, proxyPass, proxyHost, _, err := p.validateRequestAndGetProxy(w, req)
if err != nil {
// Error has already been handled, just log and return.
log.Errorf(`Failed to validate and get proxy: "%s"`, err)
if proxyHost == "" {
proxyHost = "none"
}
log.Debugf(`%s -> %s -> %s -- CONNECT -- Rejecting request: %s`, remoteAddr, proxyHost, targetHost, err)
return
}
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
targetHost, _, _ := net.SplitHostPort(req.Host)
defer log.Debugf(`%s -> %s -> %s -- CONNECT`, remoteAddr, proxyHost, targetHost)
var returnCode *int
returnCode = new(int)
*returnCode = -1
var errorMsg *string
errorMsg = new(string)
*errorMsg = ""
defer logProxyRequest(remoteAddr, proxyHost, targetHost, returnCode, "CONNECT", requestStartTime, errorMsg)
// Connect to the downstream proxy server instead of the target host
// Start a connection to the downstream proxy server.
proxyConn, err := net.DialTimeout("tcp", proxyHost, config.GetConfig().ProxyConnectTimeout)
if err != nil {
log.Errorf(`Failed to dial proxy %s - %s`, proxyHost, err)
*errorMsg = fmt.Sprintf(`Failed to dial proxy %s - %s`, proxyHost, err)
http.Error(w, "failed to make request to downstream", http.StatusServiceUnavailable)
return
}
@ -145,38 +190,26 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
return
}
resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req)
if err != nil || resp.StatusCode != 200 {
var errStr string
if err != nil {
// `err` may be nil
errStr = err.Error()
}
statusCode := -1
if resp != nil {
statusCode = resp.StatusCode
}
log.Errorf(`Failed to CONNECT to %s using proxy %s. Status code : %d - "%s"`, req.Host, proxyHost, statusCode, errStr)
// Return the original status code.
returnStatusCode := http.StatusServiceUnavailable
if statusCode != -1 {
returnStatusCode = statusCode
}
http.Error(w, "failed to execute request to downstream", returnStatusCode)
if resp == nil {
*errorMsg = fmt.Sprintf(`Failed to CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable)
return
} else if err != nil {
log.Warnf(`Error while performing CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
}
*returnCode = resp.StatusCode
w.WriteHeader(http.StatusOK)
hj, ok := w.(http.Hijacker)
if !ok {
log.Errorf(`Failed to forward connection to %s using proxy %s`, req.Host, proxyHost)
*errorMsg = fmt.Sprintf(`Failed to forward connection to %s using proxy %s`, req.Host, proxyHost)
http.Error(w, "failed to forward connection to downstream", http.StatusServiceUnavailable)
return
}
clientConn, _, err := hj.Hijack()
if err != nil {
log.Errorf(`Failed to execute connection forwarding to %s using proxy %s`, req.Host, proxyHost)
*errorMsg = fmt.Sprintf(`Failed to execute connection forwarding to %s using proxy %s`, req.Host, proxyHost)
http.Error(w, "failed to execute connection forwarding to downstream", http.StatusServiceUnavailable)
return
}

View File

@ -11,13 +11,17 @@ import (
type ForwardProxyCluster struct {
mu sync.RWMutex
ourOnlineProxies []string
ourOfflineProxies []string
thirdpartyOnlineProxies []string
thirdpartyBrokenProxies []string
thirdpartyOfflineProxies []string
ipAddresses []string
BalancerOnline WaitGroupCountable
BalancerReady WaitGroupCountable
BalancerOnline bool
currentProxyAll int32
currentProxyOurs int32
currentProxyAllWithBroken int32
refreshInProgress bool
}
var log *logrus.Logger
@ -31,23 +35,31 @@ func NewForwardProxyCluster() *ForwardProxyCluster {
atomic.StoreInt32(&p.currentProxyAll, 0)
atomic.StoreInt32(&p.currentProxyOurs, 0)
atomic.StoreInt32(&p.currentProxyAllWithBroken, 0)
p.BalancerOnline.Add(1)
p.BalancerReady.Add(1)
p.BalancerOnline = false
return p
}
func (p *ForwardProxyCluster) cycleProxy(validProxies []string, currentProxy *int32) string {
// Just round robin
currProxy := atomic.LoadInt32(currentProxy)
p.mu.RLock()
defer p.mu.RUnlock()
currProxy := int(atomic.LoadInt32(currentProxy))
if currProxy > len(validProxies)-1 {
// This might happen if the background thread is currently making changes to the list of proxies.
// Just set it to the current length of the proxies array and move on.
currProxy = len(validProxies) - 1
}
downstreamProxy := validProxies[currProxy]
newCurrentProxy := (currProxy + 1) % int32(len(validProxies))
atomic.StoreInt32(currentProxy, newCurrentProxy)
newCurrentProxy := (currProxy + 1) % len(validProxies)
atomic.StoreInt32(currentProxy, int32(newCurrentProxy))
return downstreamProxy
}
func (p *ForwardProxyCluster) getProxyFromAll() string {
p.mu.RLock()
defer p.mu.RUnlock()
validProxies := removeDuplicates(append(p.ourOnlineProxies, p.thirdpartyOnlineProxies...))
validProxies := append(p.ourOnlineProxies, p.thirdpartyOnlineProxies...)
return p.cycleProxy(validProxies, &p.currentProxyAll)
}
@ -61,7 +73,7 @@ func (p *ForwardProxyCluster) getProxyFromOurs() string {
func (p *ForwardProxyCluster) getProxyFromAllWithBroken() string {
p.mu.RLock()
defer p.mu.RUnlock()
validProxies := removeDuplicates(slices.Concat(p.ourOnlineProxies, p.thirdpartyBrokenProxies, p.thirdpartyOnlineProxies))
validProxies := slices.Concat(p.ourOnlineProxies, p.thirdpartyBrokenProxies, p.thirdpartyOnlineProxies)
return p.cycleProxy(validProxies, &p.currentProxyAllWithBroken)
}

View File

@ -5,7 +5,9 @@ import (
"fmt"
"math"
"math/rand"
"net"
"net/http"
"net/url"
"time"
)
@ -23,6 +25,8 @@ func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request
// HTTP
if req.URL.Scheme == "" {
// When the client connects using the server as a web server.
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
defer log.Infof(`%s -- %s`, remoteAddr, req.URL.Path)
if req.URL.Path == "/" {
rand.New(rand.NewSource(time.Now().Unix()))
fmt.Fprint(w, "proxy-loadbalancer <https://git.evulid.cc/cyberes/proxy-loadbalancer>\nSee /json for status info.\n\n\n\n"+retardation[rand.Intn(len(retardation))])
@ -30,14 +34,19 @@ func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request
} else if req.URL.Path == "/json" {
p.mu.RLock()
response := map[string]interface{}{
"uptime": int(math.Round(time.Since(startTime).Seconds())),
"online": p.BalancerOnline.GetCount() == 0,
"uptime": int(math.Round(time.Since(startTime).Seconds())),
"online": p.BalancerOnline && p.BalancerReady.GetCount() == 0,
"refreshInProgress": p.refreshInProgress,
"proxies": map[string]interface{}{
"totalOnline": len(p.ourOnlineProxies) + len(p.thirdpartyOnlineProxies),
"ours": p.ourOnlineProxies,
"ours": map[string]interface{}{
"online": removeCredentials(p.ourOnlineProxies),
"offline": removeCredentials(p.ourOfflineProxies),
},
"thirdParty": map[string]interface{}{
"online": p.thirdpartyOnlineProxies,
"broken": p.thirdpartyBrokenProxies,
"online": removeCredentials(p.thirdpartyOnlineProxies),
"broken": removeCredentials(p.thirdpartyBrokenProxies),
"offline": removeCredentials(p.thirdpartyOfflineProxies),
},
},
}
@ -62,3 +71,20 @@ func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request
}
}
}
func removeCredentials(proxyURLs []string) []string {
var newURLs []string
for _, proxyURL := range proxyURLs {
u, err := url.Parse(proxyURL)
if err != nil {
// Skip if invalid.
continue
}
u.User = nil
newURLs = append(newURLs, u.String())
}
if len(newURLs) == 0 {
newURLs = make([]string, 0)
}
return newURLs
}

View File

@ -17,22 +17,29 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
ctx := context.TODO()
for {
// TODO: need to have these be temp vars and then copy them over when finished
startTime := time.Now()
p.refreshInProgress = true
allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...))
p.ourOnlineProxies = make([]string, 0)
p.thirdpartyOnlineProxies = make([]string, 0)
p.thirdpartyBrokenProxies = make([]string, 0)
p.ipAddresses = make([]string, 0)
newOurOnlineProxies := make([]string, 0)
newOurOfflineProxies := make([]string, 0)
newThirdpartyOnlineProxies := make([]string, 0)
newThirdpartyBrokenProxies := make([]string, 0)
newThirdpartyOfflineProxies := make([]string, 0)
newIpAddresses := make([]string, 0)
var wg sync.WaitGroup
for _, pxy := range allProxies {
wg.Add(1)
// TODO: semaphore to limit active checks
go func(pxy string) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorf("Validate - failed to acquire semaphore: %v\n", err)
log.Errorf("Validate - failed to acquire semaphore: %v", err)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
defer sem.Release(1)
@ -40,58 +47,88 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
_, _, proxyHost, _, err := splitProxyURL(pxy)
if err != nil {
log.Errorf(`Invalid proxy "%s"`, pxy)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
// Test the proxy.
ipAddr, testErr := sendRequestThroughProxy(pxy, config.GetConfig().IpCheckerURL)
if testErr != nil {
log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr)
log.Debugf("Validate - %s failed: %s", proxyHost, testErr)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
if slices.Contains(p.ipAddresses, ipAddr) {
log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost)
if slices.Contains(newIpAddresses, ipAddr) {
log.Warnf("Validate - duplicate IP Address %s for %s", ipAddr, proxyHost)
if isThirdparty(pxy) {
newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy)
} else {
newOurOfflineProxies = append(newOurOfflineProxies, pxy)
}
return
}
p.ipAddresses = append(p.ipAddresses, ipAddr)
newIpAddresses = append(newIpAddresses, ipAddr)
// Sort the proxy into the right groups.
if isThirdparty(pxy) {
p.mu.Lock()
p.thirdpartyOnlineProxies = append(p.thirdpartyOnlineProxies, pxy)
p.mu.Unlock()
okToAdd := true
for _, d := range config.GetConfig().ThirdpartyTestUrls {
_, bv3hiErr := sendRequestThroughProxy(pxy, d)
if bv3hiErr != nil {
log.Debugf("Validate - Third-party %s failed: %s\n", proxyHost, bv3hiErr)
p.thirdpartyBrokenProxies = append(p.thirdpartyBrokenProxies, pxy)
okToAdd = false
newThirdpartyBrokenProxies = append(newThirdpartyBrokenProxies, pxy)
log.Debugf("Validate - %s failed third-party test: %s", proxyHost, bv3hiErr)
break
}
}
if okToAdd {
newThirdpartyOnlineProxies = append(newThirdpartyOnlineProxies, pxy)
}
} else {
p.mu.Lock()
p.ourOnlineProxies = append(p.ourOnlineProxies, pxy)
p.mu.Unlock()
newOurOnlineProxies = append(newOurOnlineProxies, pxy)
}
}(pxy)
}
wg.Wait()
if !started {
p.mu.Lock()
p.ourOnlineProxies = removeDuplicates(newOurOnlineProxies)
p.ourOfflineProxies = newOurOfflineProxies
p.thirdpartyOnlineProxies = removeDuplicates(newThirdpartyOnlineProxies)
p.thirdpartyBrokenProxies = removeDuplicates(newThirdpartyBrokenProxies)
p.thirdpartyOfflineProxies = newThirdpartyOfflineProxies
p.ipAddresses = removeDuplicates(newIpAddresses)
p.BalancerOnline = len(slices.Concat(p.ourOnlineProxies, p.thirdpartyOnlineProxies, p.thirdpartyBrokenProxies)) > 0 // Online only if there are active and online proxies.
p.mu.Unlock()
if config.GetConfig().ShuffleProxies {
p.mu.Lock()
p.ourOnlineProxies = shuffle(p.ourOnlineProxies)
p.thirdpartyOnlineProxies = shuffle(p.thirdpartyOnlineProxies)
p.mu.Unlock()
}
if !started {
started = true
p.BalancerOnline.Done()
p.BalancerReady.Done()
}
p.mu.RLock()
log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d\n",
len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies)))
log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d, Elapsed: %s",
len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+len(p.thirdpartyOnlineProxies), time.Since(startTime),
)
p.mu.RUnlock()
time.Sleep(60 * time.Second)
p.refreshInProgress = false
time.Sleep(time.Duration(config.GetConfig().ProxyCheckInterval) * time.Second)
}
}
@ -115,12 +152,9 @@ func removeDuplicates(elements []string) []string {
if encountered[elements[v]] == true {
// Do not add duplicate.
} else {
// Record this element as an encountered element.
encountered[elements[v]] = true
// Append to result slice.
result = append(result, elements[v])
}
}
// Return the new slice.
return result
}