diff --git a/README.md b/README.md index d4b06bf..48a6a45 100644 --- a/README.md +++ b/README.md @@ -28,10 +28,11 @@ The server displays stats and info at `/json` === Proxy Load Balancer === Usage of ./proxy-loadbalancer: --config [string] - Path to the config file - --d, --debug - Enable debug mode - --v Print version and exit + Path to the config file + -d, --debug + Enable debug mode + --v Print version and exit + -h, --help Print this help message ``` ## Special Headers diff --git a/config.example.yml b/config.example.yml index d5e9ef9..14a10b1 100644 --- a/config.example.yml +++ b/config.example.yml @@ -1,14 +1,14 @@ # Port to run on. -http_port: 9000 +http_port: 9000 # How many proxies will be checked at once? -proxy_checkers: 50 +proxy_checkers: 50 # URL to get a proxy's IP. -ip_checker_url: https://api.ipify.org +ip_checker_url: https://api.ipify.org # Connection timeout for the proxies in seconds. -proxy_connect_timeout: 10 +proxy_connect_timeout: 60 # Your proxies. proxy_pool_ours: @@ -27,4 +27,8 @@ thirdparty_test_urls: # Don't route requests for these domains through the third-party proxies. thirdparty_bypass_domains: - - twitter.com \ No newline at end of file + - twitter.com + +# Shuffle the proxy lists whenever the background thread refreshes them. +# If false, round-robin on default order. +shuffle_proxies: false diff --git a/src/config/config.go b/src/config/config.go index e70e14a..7b60fee 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -18,6 +18,7 @@ type Config struct { ProxyPoolThirdparty []string ThirdpartyTestUrls []string ThirdpartyBypassDomains []string + ShuffleProxies bool } func SetConfig(configFile string) (*Config, error) { @@ -29,11 +30,12 @@ func SetConfig(configFile string) (*Config, error) { viper.SetConfigFile(configFile) viper.SetDefault("http_port", "5000") viper.SetDefault("proxy_checkers", 50) - viper.SetDefault("proxy_connect_timeout", 10) + viper.SetDefault("proxy_connect_timeout", 60) viper.SetDefault("proxy_pool_ours", make([]string, 0)) viper.SetDefault("proxy_pool_thirdparty", make([]string, 0)) viper.SetDefault("thirdparty_test_urls", make([]string, 0)) viper.SetDefault("thirdparty_bypass_domains", make([]string, 0)) + viper.SetDefault("shuffle_proxies", false) err := viper.ReadInConfig() if err != nil { @@ -48,6 +50,11 @@ func SetConfig(configFile string) (*Config, error) { ProxyPoolThirdparty: viper.GetStringSlice("proxy_pool_thirdparty"), ThirdpartyTestUrls: viper.GetStringSlice("thirdparty_test_urls"), ThirdpartyBypassDomains: viper.GetStringSlice("thirdparty_bypass_domains"), + ShuffleProxies: viper.GetBool("shuffle_proxies"), + } + + if len(config.ProxyPoolOurs) == 0 && len(config.ProxyPoolThirdparty) == 0 { + return nil, errors.New("no proxies configured") } if config.IpCheckerURL == "" { diff --git a/src/proxy-loadbalancer.go b/src/proxy-loadbalancer.go index 7f9f459..b7d79a1 100644 --- a/src/proxy-loadbalancer.go +++ b/src/proxy-loadbalancer.go @@ -89,7 +89,7 @@ func main() { log.Infof("-> Server started on 0.0.0.0:%s <-", configData.HTTPPort) go proxyCluster.ValidateProxiesThread() - proxyCluster.BalancerOnline.Wait() + proxyCluster.BalancerReady.Wait() log.Infoln("-> Proxy server accepting requests <-") select {} diff --git a/src/proxy/handleConnect.go b/src/proxy/handleConnect.go index 0f73182..0cecd6b 100644 --- a/src/proxy/handleConnect.go +++ b/src/proxy/handleConnect.go @@ -24,7 +24,7 @@ func logProxyRequest(remoteAddr string, proxyHost string, targetHost string, ret } func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter, req *http.Request) (string, string, string, string, *url.URL, error) { - if p.BalancerOnline.GetCount() != 0 { + if p.BalancerReady.GetCount() != 0 { errStr := "proxy is not ready" http.Error(w, errStr, http.StatusServiceUnavailable) return "", "", "", "", nil, errors.New(errStr) diff --git a/src/proxy/proxy.go b/src/proxy/proxy.go index 3c7f3fd..691cd23 100644 --- a/src/proxy/proxy.go +++ b/src/proxy/proxy.go @@ -11,13 +11,17 @@ import ( type ForwardProxyCluster struct { mu sync.RWMutex ourOnlineProxies []string + ourOfflineProxies []string thirdpartyOnlineProxies []string thirdpartyBrokenProxies []string + thirdpartyOfflineProxies []string ipAddresses []string - BalancerOnline WaitGroupCountable + BalancerReady WaitGroupCountable + BalancerOnline bool currentProxyAll int32 currentProxyOurs int32 currentProxyAllWithBroken int32 + refreshInProgress bool } var log *logrus.Logger @@ -31,7 +35,8 @@ func NewForwardProxyCluster() *ForwardProxyCluster { atomic.StoreInt32(&p.currentProxyAll, 0) atomic.StoreInt32(&p.currentProxyOurs, 0) atomic.StoreInt32(&p.currentProxyAllWithBroken, 0) - p.BalancerOnline.Add(1) + p.BalancerReady.Add(1) + p.BalancerOnline = false return p } diff --git a/src/proxy/serve.go b/src/proxy/serve.go index c231798..fabaa72 100644 --- a/src/proxy/serve.go +++ b/src/proxy/serve.go @@ -34,14 +34,19 @@ func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request } else if req.URL.Path == "/json" { p.mu.RLock() response := map[string]interface{}{ - "uptime": int(math.Round(time.Since(startTime).Seconds())), - "online": p.BalancerOnline.GetCount() == 0, + "uptime": int(math.Round(time.Since(startTime).Seconds())), + "online": p.BalancerOnline && p.BalancerReady.GetCount() == 0, + "refreshInProgress": p.refreshInProgress, "proxies": map[string]interface{}{ "totalOnline": len(p.ourOnlineProxies) + len(p.thirdpartyOnlineProxies), - "ours": removeCredentials(p.ourOnlineProxies), + "ours": map[string]interface{}{ + "online": removeCredentials(p.ourOnlineProxies), + "offline": removeCredentials(p.ourOfflineProxies), + }, "thirdParty": map[string]interface{}{ - "online": removeCredentials(p.thirdpartyOnlineProxies), - "broken": removeCredentials(p.thirdpartyBrokenProxies), + "online": removeCredentials(p.thirdpartyOnlineProxies), + "broken": removeCredentials(p.thirdpartyBrokenProxies), + "offline": removeCredentials(p.thirdpartyOfflineProxies), }, }, } @@ -72,10 +77,14 @@ func removeCredentials(proxyURLs []string) []string { for _, proxyURL := range proxyURLs { u, err := url.Parse(proxyURL) if err != nil { - return nil + // Skip if invalid. + continue } u.User = nil newURLs = append(newURLs, u.String()) } + if len(newURLs) == 0 { + newURLs = make([]string, 0) + } return newURLs } diff --git a/src/proxy/threads.go b/src/proxy/threads.go index 7f98f7b..650361a 100644 --- a/src/proxy/threads.go +++ b/src/proxy/threads.go @@ -17,10 +17,13 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { ctx := context.TODO() for { + p.refreshInProgress = true allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...)) newOurOnlineProxies := make([]string, 0) + newOurOfflineProxies := make([]string, 0) newThirdpartyOnlineProxies := make([]string, 0) newThirdpartyBrokenProxies := make([]string, 0) + newThirdpartyOfflineProxies := make([]string, 0) newIpAddresses := make([]string, 0) var wg sync.WaitGroup @@ -31,6 +34,11 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { if err := sem.Acquire(ctx, 1); err != nil { log.Errorf("Validate - failed to acquire semaphore: %v", err) + if isThirdparty(pxy) { + newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy) + } else { + newOurOfflineProxies = append(newOurOfflineProxies, pxy) + } return } defer sem.Release(1) @@ -38,6 +46,11 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { _, _, proxyHost, _, err := splitProxyURL(pxy) if err != nil { log.Errorf(`Invalid proxy "%s"`, pxy) + if isThirdparty(pxy) { + newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy) + } else { + newOurOfflineProxies = append(newOurOfflineProxies, pxy) + } return } @@ -45,10 +58,20 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { ipAddr, testErr := sendRequestThroughProxy(pxy, config.GetConfig().IpCheckerURL) if testErr != nil { log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr) + if isThirdparty(pxy) { + newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy) + } else { + newOurOfflineProxies = append(newOurOfflineProxies, pxy) + } return } if slices.Contains(newIpAddresses, ipAddr) { log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost) + if isThirdparty(pxy) { + newThirdpartyOfflineProxies = append(newThirdpartyOfflineProxies, pxy) + } else { + newOurOfflineProxies = append(newOurOfflineProxies, pxy) + } return } newIpAddresses = append(newIpAddresses, ipAddr) @@ -73,18 +96,24 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { p.mu.Lock() p.ourOnlineProxies = removeDuplicates(newOurOnlineProxies) + p.ourOfflineProxies = newOurOfflineProxies p.thirdpartyOnlineProxies = removeDuplicates(newThirdpartyOnlineProxies) p.thirdpartyBrokenProxies = removeDuplicates(newThirdpartyBrokenProxies) + p.thirdpartyOfflineProxies = newThirdpartyOfflineProxies p.ipAddresses = removeDuplicates(newIpAddresses) + p.BalancerOnline = len(slices.Concat(p.ourOnlineProxies, p.thirdpartyOnlineProxies, p.thirdpartyBrokenProxies)) > 0 // Online only if there are active and online proxies. p.mu.Unlock() - if !started { + if config.GetConfig().ShuffleProxies { p.mu.Lock() p.ourOnlineProxies = shuffle(p.ourOnlineProxies) p.thirdpartyOnlineProxies = shuffle(p.thirdpartyOnlineProxies) p.mu.Unlock() + } + + if !started { started = true - p.BalancerOnline.Done() + p.BalancerReady.Done() } p.mu.RLock() @@ -92,6 +121,7 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies))) p.mu.RUnlock() + p.refreshInProgress = false time.Sleep(60 * time.Second) } }