improve background thread resouce locking, adjust printing

This commit is contained in:
Cyberes 2024-04-12 20:44:05 -06:00
parent fb124561eb
commit 5273d1b16f
1 changed files with 19 additions and 18 deletions

View File

@ -17,22 +17,20 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
ctx := context.TODO() ctx := context.TODO()
for { for {
// TODO: need to have these be temp vars and then copy them over when finished
allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...)) allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...))
p.ourOnlineProxies = make([]string, 0) newOurOnlineProxies := make([]string, 0)
p.thirdpartyOnlineProxies = make([]string, 0) newThirdpartyOnlineProxies := make([]string, 0)
p.thirdpartyBrokenProxies = make([]string, 0) newThirdpartyBrokenProxies := make([]string, 0)
p.ipAddresses = make([]string, 0) newIpAddresses := make([]string, 0)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, pxy := range allProxies { for _, pxy := range allProxies {
wg.Add(1) wg.Add(1)
// TODO: semaphore to limit active checks
go func(pxy string) { go func(pxy string) {
defer wg.Done() defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil { if err := sem.Acquire(ctx, 1); err != nil {
log.Errorf("Validate - failed to acquire semaphore: %v\n", err) log.Errorf("Validate - failed to acquire semaphore: %v", err)
return return
} }
defer sem.Release(1) defer sem.Release(1)
@ -49,34 +47,37 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr) log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr)
return return
} }
if slices.Contains(p.ipAddresses, ipAddr) { if slices.Contains(newIpAddresses, ipAddr) {
log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost) log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost)
return return
} }
p.ipAddresses = append(p.ipAddresses, ipAddr) newIpAddresses = append(newIpAddresses, ipAddr)
// Sort the proxy into the right groups. // Sort the proxy into the right groups.
if isThirdparty(pxy) { if isThirdparty(pxy) {
p.mu.Lock() newThirdpartyOnlineProxies = append(newThirdpartyOnlineProxies, pxy)
p.thirdpartyOnlineProxies = append(p.thirdpartyOnlineProxies, pxy)
p.mu.Unlock()
for _, d := range config.GetConfig().ThirdpartyTestUrls { for _, d := range config.GetConfig().ThirdpartyTestUrls {
_, bv3hiErr := sendRequestThroughProxy(pxy, d) _, bv3hiErr := sendRequestThroughProxy(pxy, d)
if bv3hiErr != nil { if bv3hiErr != nil {
log.Debugf("Validate - Third-party %s failed: %s\n", proxyHost, bv3hiErr) log.Debugf("Validate - Third-party %s failed: %s", proxyHost, bv3hiErr)
p.thirdpartyBrokenProxies = append(p.thirdpartyBrokenProxies, pxy) newThirdpartyBrokenProxies = append(newThirdpartyBrokenProxies, pxy)
} }
} }
} else { } else {
p.mu.Lock() newOurOnlineProxies = append(newOurOnlineProxies, pxy)
p.ourOnlineProxies = append(p.ourOnlineProxies, pxy)
p.mu.Unlock()
} }
}(pxy) }(pxy)
} }
wg.Wait() wg.Wait()
p.mu.Lock()
p.ourOnlineProxies = newOurOnlineProxies
p.thirdpartyOnlineProxies = newThirdpartyOnlineProxies
p.thirdpartyBrokenProxies = newThirdpartyBrokenProxies
p.ipAddresses = newIpAddresses
p.mu.Unlock()
if !started { if !started {
p.mu.Lock() p.mu.Lock()
p.ourOnlineProxies = shuffle(p.ourOnlineProxies) p.ourOnlineProxies = shuffle(p.ourOnlineProxies)
@ -87,7 +88,7 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() {
} }
p.mu.RLock() p.mu.RLock()
log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d\n", log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d",
len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies))) len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies)))
p.mu.RUnlock() p.mu.RUnlock()