prototype new golang loadbalancer

This commit is contained in:
Cyberes 2024-04-12 01:26:45 -06:00
parent da9c2b1635
commit d1bea64203
13 changed files with 473 additions and 1 deletions

3
.gitignore vendored
View File

@ -1,6 +1,9 @@
.idea/
config.py
dist/
test.go
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@ -2,6 +2,8 @@
_A round-robin load balancer for HTTP proxies._
does not support downstream https servers.
This is a simple load balancer using [proxy.py](https://github.com/abhinavsingh/proxy.py) that will route requests to a
cluster of proxy backends in a round-robin fashion. This makes it easy to connect your clients to a large number of
proxy
@ -23,4 +25,4 @@ To start the load balancer server, navigate to `./proxy-skeleton` and run `pytho
The load balancer accepts special headers to control its behavior.
- `Smartproxy-Bypass`: don't use any SmartProxy endpoints.
- `Smartproxy-Disable-BV3HI`: don't filter SmartProxy endpoints by the 503 connect error.
- `Smartproxy-Disable-BV3HI`: don't filter SmartProxy endpoints by the 503 connect error.

24
build.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/bash
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
if [ -z ${1+x} ]; then
VERSION="0.0.0"
else
VERSION="$1"
fi
mkdir -p "$SCRIPT_DIR/dist"
rm "$SCRIPT_DIR"/dist/crazyfs-* &> /dev/null
BUILDARGS="$(uname)-$(uname -p)"
OUTPUTFILE="$SCRIPT_DIR/dist/crazyfs-$VERSION-$BUILDARGS"
cd "$SCRIPT_DIR/src" || exit 1
go mod tidy
go build -v -trimpath -ldflags "-s -w -X main.VersionDate=$(date -u --iso-8601=minutes) -X main.Version=v$VERSION" -o "$OUTPUTFILE"
if [ $? -eq 0 ]; then
chmod +x "$OUTPUTFILE"
echo "Build Succeeded -> $OUTPUTFILE"
fi

16
src/config/validate.go Normal file
View File

@ -0,0 +1,16 @@
package config
import (
"errors"
"fmt"
"strings"
)
func ValidateProxies(proxies []string) error {
for _, proxy := range proxies {
if !strings.HasPrefix("http://", proxy) {
return errors.New(fmt.Sprintf(`proxy "%s" must start with http://`, proxy))
}
}
return nil
}

10
src/go.mod Normal file
View File

@ -0,0 +1,10 @@
module main
go 1.22.1
require (
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5
golang.org/x/sync v0.7.0
)
require github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5 // indirect

8
src/go.sum Normal file
View File

@ -0,0 +1,8 @@
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 h1:m62nsMU279qRD9PQSWD1l66kmkXzuYcnVJqL4XLeV2M=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5 h1:iGoePcl8bIDJxxRAL2Q4E4Rt35z5m917RJb8lAvdrQw=
github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=

69
src/proxy-loadbalancer.go Normal file
View File

@ -0,0 +1,69 @@
package main
import (
"flag"
"fmt"
"log"
"main/proxy"
"net/http"
"os"
"runtime/debug"
)
type cliConfig struct {
configFile string
initialCrawl bool
debug bool
disableElasticSync bool
help bool
version bool
}
var Version = "development"
var VersionDate = "not set"
func main() {
fmt.Println("=== Proxy Load Balancer ===")
cliArgs := parseArgs()
if cliArgs.help {
flag.Usage()
os.Exit(0)
}
if cliArgs.version {
buildInfo, ok := debug.ReadBuildInfo()
if ok {
buildSettings := make(map[string]string)
for i := range buildInfo.Settings {
buildSettings[buildInfo.Settings[i].Key] = buildInfo.Settings[i].Value
}
fmt.Printf("Version: %s\n\n", Version)
fmt.Printf("Date Compiled: %s\n", VersionDate)
fmt.Printf("Git Revision: %s\n", buildSettings["vcs.revision"])
fmt.Printf("Git Revision Date: %s\n", buildSettings["vcs.time"])
fmt.Printf("Git Modified: %s\n", buildSettings["vcs.modified"])
} else {
fmt.Println("Build info not available")
}
os.Exit(0)
}
proxyCluster := proxy.NewForwardProxyCluster()
go proxyCluster.ValidateProxiesThread()
proxyCluster.BalancerOnline.Wait()
go func() {
log.Fatal(http.ListenAndServe(":5000", proxyCluster))
}()
fmt.Println("Server started!")
select {}
}
func parseArgs() cliConfig {
var cliArgs cliConfig
flag.StringVar(&cliArgs.configFile, "config", "", "Path to the config file")
flag.BoolVar(&cliArgs.debug, "d", false, "Enable debug mode")
flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug mode")
flag.BoolVar(&cliArgs.version, "v", false, "Print version and exit")
flag.Parse()
return cliArgs
}

42
src/proxy/checkProxy.go Normal file
View File

@ -0,0 +1,42 @@
package proxy
import (
"fmt"
"io"
"net/http"
"net/url"
"time"
)
func sendRequestThroughProxy(proxyUrl string, targetURL string) (string, error) {
parsedProxyUrl, err := url.Parse(proxyUrl)
if err != nil {
return "", err
}
if IsSmartproxy(proxyUrl) {
// Set the username and password for proxy authentication if Smartproxy
parsedProxyUrl.User = url.UserPassword(smartproxyUsername, smartproxyPassword)
}
transport := &http.Transport{
Proxy: http.ProxyURL(parsedProxyUrl),
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 10,
}
response, err := client.Get(targetURL)
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(bodyBytes), nil
}
return "", fmt.Errorf("bad response code %d", response.StatusCode)
}

104
src/proxy/handleConnect.go Normal file
View File

@ -0,0 +1,104 @@
package proxy
import (
"bufio"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"strings"
"sync/atomic"
)
func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.Request) {
proxyURLParsed, _ := url.Parse(p.getProxy())
proxyURLParsed.Scheme = "http"
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURLParsed),
},
}
proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
copyHeader(proxyReq.Header, req.Header)
proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
resp, err := client.Do(proxyReq)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func (p *ForwardProxyCluster) proxyHTTPSConnect(w http.ResponseWriter, req *http.Request) {
log.Printf("CONNECT requested to %v (from %v)", req.Host, req.RemoteAddr)
allValidProxies := append(p.ourOnlineProxies, p.smartproxyOnlineProxies...)
currentProxy := atomic.LoadInt32(&p.CurrentProxy)
downstreamProxy := allValidProxies[currentProxy]
downstreamProxy = strings.Replace(downstreamProxy, "http://", "", -1)
downstreamProxy = strings.Replace(downstreamProxy, "https://", "", -1)
newCurrentProxy := (currentProxy + 1) % int32(len(testProxies))
atomic.StoreInt32(&p.CurrentProxy, newCurrentProxy)
// Connect to the downstream proxy server instead of the target host
proxyConn, err := net.Dial("tcp", downstreamProxy)
if err != nil {
log.Println("failed to dial to proxy", downstreamProxy, err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// Send a new CONNECT request to the downstream proxy
_, err = fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", req.Host, req.Host)
if err != nil {
return
}
resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req)
if err != nil || resp.StatusCode != 200 {
log.Println("failed to CONNECT to target", req.Host)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
hj, ok := w.(http.Hijacker)
if !ok {
log.Fatal("http server doesn't support hijacking connection")
}
clientConn, _, err := hj.Hijack()
if err != nil {
log.Fatal("http hijacking failed")
}
log.Println("tunnel established")
go tunnelConn(proxyConn, clientConn)
go tunnelConn(clientConn, proxyConn)
}
func tunnelConn(dst io.WriteCloser, src io.ReadCloser) {
io.Copy(dst, src)
dst.Close()
src.Close()
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}

35
src/proxy/proxy.go Normal file
View File

@ -0,0 +1,35 @@
package proxy
import (
"sync"
"sync/atomic"
)
type ForwardProxyCluster struct {
// TODO: mutex rwlock
ourOnlineProxies []string
smartproxyOnlineProxies []string
smartproxyBrokenProxies []string
ipAddresses []string
BalancerOnline sync.WaitGroup
CurrentProxy int32
}
// TODO: move all smartproxy things to "thirdparty"
func NewForwardProxyCluster() *ForwardProxyCluster {
p := &ForwardProxyCluster{}
atomic.StoreInt32(&p.CurrentProxy, 0)
p.BalancerOnline.Add(1)
return p
}
func (p *ForwardProxyCluster) getProxy() string {
// Just round robin
allValidProxies := append(p.ourOnlineProxies, p.smartproxyOnlineProxies...)
currentProxy := atomic.LoadInt32(&p.CurrentProxy)
downstreamProxy := allValidProxies[currentProxy]
newCurrentProxy := (currentProxy + 1) % int32(len(testProxies))
atomic.StoreInt32(&p.CurrentProxy, newCurrentProxy)
return downstreamProxy
}

23
src/proxy/serve.go Normal file
View File

@ -0,0 +1,23 @@
package proxy
import (
"fmt"
"log"
"net/http"
)
func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodConnect {
// HTTPS
p.proxyHTTPSConnect(w, req)
} else {
// HTTP
if req.URL.Scheme != "http" {
msg := fmt.Sprintf(`unsupported protocal "%s"`, req.URL.Scheme)
http.Error(w, msg, http.StatusBadRequest)
log.Println(msg)
return
}
p.proxyHttpConnect(w, req)
}
}

14
src/proxy/string.go Normal file
View File

@ -0,0 +1,14 @@
package proxy
import (
"net/url"
"strings"
)
func IsSmartproxy(proxyUrl string) bool {
parsedProxyUrl, err := url.Parse(proxyUrl)
if err != nil {
panic(err)
}
return strings.Split(parsedProxyUrl.Host, ":")[0] == "dc.smartproxy.com"
}

122
src/proxy/validate.go Normal file
View File

@ -0,0 +1,122 @@
package proxy
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"log"
"math/rand"
"slices"
"sync"
"time"
)
func (p *ForwardProxyCluster) ValidateProxiesThread() {
log.Println("Doing initial backend check, please wait...")
started := false
// TODO: config value
var sem = semaphore.NewWeighted(int64(50))
ctx := context.TODO()
for {
// TODO: need to have these be temp vars and then copy them over when finished
allProxies := removeDuplicates(append(testProxies, testSmartproxyPool...))
p.ourOnlineProxies = make([]string, 0)
p.smartproxyOnlineProxies = make([]string, 0)
p.smartproxyBrokenProxies = make([]string, 0)
p.ipAddresses = make([]string, 0)
var wg sync.WaitGroup
for _, pxy := range allProxies {
wg.Add(1)
// TODO: semaphore to limit active checks
go func(pxy string) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Failed to acquire semaphore: %v\n", err)
return
}
defer sem.Release(1)
// Test the proxy.
ipAddr, testErr := sendRequestThroughProxy(pxy, testTargetUrl)
if testErr != nil {
fmt.Printf("Proxy %s failed: %s\n", pxy, testErr)
return
}
if slices.Contains(p.ipAddresses, ipAddr) {
fmt.Printf("Duplicate IP Address %s for proxy %s\n", ipAddr, pxy)
return
}
p.ipAddresses = append(p.ipAddresses, ipAddr)
// Sort the proxy into the right groups.
if IsSmartproxy(pxy) {
p.smartproxyOnlineProxies = append(p.smartproxyOnlineProxies, pxy)
for _, d := range testSmartproxyBV3HIFix {
_, bv3hiErr := sendRequestThroughProxy(pxy, d)
if bv3hiErr != nil {
fmt.Printf("Smartproxy %s failed: %s\n", pxy, bv3hiErr)
p.smartproxyBrokenProxies = append(p.smartproxyBrokenProxies, pxy)
}
}
} else {
p.ourOnlineProxies = append(p.ourOnlineProxies, pxy)
}
}(pxy)
}
wg.Wait()
if !started {
p.ourOnlineProxies = shuffle(p.ourOnlineProxies)
p.smartproxyOnlineProxies = shuffle(p.smartproxyOnlineProxies)
started = true
p.BalancerOnline.Done()
}
log.Printf("Our Endpoints Online: %d, Smartproxy Endpoints Online: %d, Smartproxy Broken Backends: %d, Total Online: %d\n",
len(p.ourOnlineProxies), len(p.smartproxyOnlineProxies), len(p.smartproxyBrokenProxies), len(p.ourOnlineProxies)+(len(p.smartproxyOnlineProxies)-len(p.smartproxyBrokenProxies)))
time.Sleep(60 * time.Second)
}
}
func getKeysFromMap(m map[string]string) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
func shuffle(vals []string) []string {
r := rand.New(rand.NewSource(time.Now().Unix()))
ret := make([]string, len(vals))
n := len(vals)
for i := 0; i < n; i++ {
randIndex := r.Intn(len(vals))
ret[i] = vals[randIndex]
vals = append(vals[:randIndex], vals[randIndex+1:]...)
}
return ret
}
func removeDuplicates(elements []string) []string {
encountered := map[string]bool{}
var result []string
for v := range elements {
if encountered[elements[v]] == true {
// Do not add duplicate.
} else {
// Record this element as an encountered element.
encountered[elements[v]] = true
// Append to result slice.
result = append(result, elements[v])
}
}
// Return the new slice.
return result
}