From 5273d1b16ff7a2ff26dae52c404a40e504e9f3c2 Mon Sep 17 00:00:00 2001 From: Cyberes Date: Fri, 12 Apr 2024 20:44:05 -0600 Subject: [PATCH] improve background thread resouce locking, adjust printing --- src/proxy/threads.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/proxy/threads.go b/src/proxy/threads.go index 6685d76..e3c6f5c 100644 --- a/src/proxy/threads.go +++ b/src/proxy/threads.go @@ -17,22 +17,20 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { ctx := context.TODO() for { - // TODO: need to have these be temp vars and then copy them over when finished allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...)) - p.ourOnlineProxies = make([]string, 0) - p.thirdpartyOnlineProxies = make([]string, 0) - p.thirdpartyBrokenProxies = make([]string, 0) - p.ipAddresses = make([]string, 0) + newOurOnlineProxies := make([]string, 0) + newThirdpartyOnlineProxies := make([]string, 0) + newThirdpartyBrokenProxies := make([]string, 0) + newIpAddresses := make([]string, 0) var wg sync.WaitGroup for _, pxy := range allProxies { wg.Add(1) - // TODO: semaphore to limit active checks go func(pxy string) { defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { - log.Errorf("Validate - failed to acquire semaphore: %v\n", err) + log.Errorf("Validate - failed to acquire semaphore: %v", err) return } defer sem.Release(1) @@ -49,34 +47,37 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr) return } - if slices.Contains(p.ipAddresses, ipAddr) { + if slices.Contains(newIpAddresses, ipAddr) { log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost) return } - p.ipAddresses = append(p.ipAddresses, ipAddr) + newIpAddresses = append(newIpAddresses, ipAddr) // Sort the proxy into the right groups. if isThirdparty(pxy) { - p.mu.Lock() - p.thirdpartyOnlineProxies = append(p.thirdpartyOnlineProxies, pxy) - p.mu.Unlock() + newThirdpartyOnlineProxies = append(newThirdpartyOnlineProxies, pxy) for _, d := range config.GetConfig().ThirdpartyTestUrls { _, bv3hiErr := sendRequestThroughProxy(pxy, d) if bv3hiErr != nil { - log.Debugf("Validate - Third-party %s failed: %s\n", proxyHost, bv3hiErr) - p.thirdpartyBrokenProxies = append(p.thirdpartyBrokenProxies, pxy) + log.Debugf("Validate - Third-party %s failed: %s", proxyHost, bv3hiErr) + newThirdpartyBrokenProxies = append(newThirdpartyBrokenProxies, pxy) } } } else { - p.mu.Lock() - p.ourOnlineProxies = append(p.ourOnlineProxies, pxy) - p.mu.Unlock() + newOurOnlineProxies = append(newOurOnlineProxies, pxy) } }(pxy) } wg.Wait() + p.mu.Lock() + p.ourOnlineProxies = newOurOnlineProxies + p.thirdpartyOnlineProxies = newThirdpartyOnlineProxies + p.thirdpartyBrokenProxies = newThirdpartyBrokenProxies + p.ipAddresses = newIpAddresses + p.mu.Unlock() + if !started { p.mu.Lock() p.ourOnlineProxies = shuffle(p.ourOnlineProxies) @@ -87,7 +88,7 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { } p.mu.RLock() - log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d\n", + log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d", len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies))) p.mu.RUnlock()