diff --git a/.gitignore b/.gitignore index 9dda486..0bd7aa2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ config.py dist/ test.go +config.yml +config.yaml # ---> Python # Byte-compiled / optimized / DLL files diff --git a/build.sh b/build.sh index bd1308b..c4651bc 100755 --- a/build.sh +++ b/build.sh @@ -9,10 +9,10 @@ else fi mkdir -p "$SCRIPT_DIR/dist" -rm "$SCRIPT_DIR"/dist/crazyfs-* &> /dev/null +rm "$SCRIPT_DIR"/dist/* &> /dev/null BUILDARGS="$(uname)-$(uname -p)" -OUTPUTFILE="$SCRIPT_DIR/dist/crazyfs-$VERSION-$BUILDARGS" +OUTPUTFILE="$SCRIPT_DIR/dist/proxy-loadbalancer-$VERSION-$BUILDARGS" cd "$SCRIPT_DIR/src" || exit 1 go mod tidy diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..d5e9ef9 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,30 @@ +# Port to run on. +http_port: 9000 + +# How many proxies will be checked at once? +proxy_checkers: 50 + +# URL to get a proxy's IP. +ip_checker_url: https://api.ipify.org + +# Connection timeout for the proxies in seconds. +proxy_connect_timeout: 10 + +# Your proxies. +proxy_pool_ours: + - http://1.2.3.4:3128 + - http://5.6.7.8:3128 + +# Your third-party proxies. +proxy_pool_thirdparty: + - http://username:password@example:10001 + +# URL used to test third-party proxies against. +# Some proxies just don't work on some domains. If a proxy fails this check it will be marked as +# "unhealthy" and removed from the general pool. +thirdparty_test_urls: + - https://files.catbox.moe/1hvrlj.png + +# Don't route requests for these domains through the third-party proxies. +thirdparty_bypass_domains: + - twitter.com \ No newline at end of file diff --git a/src/config/config.go b/src/config/config.go new file mode 100644 index 0000000..e70e14a --- /dev/null +++ b/src/config/config.go @@ -0,0 +1,82 @@ +package config + +import ( + "errors" + "github.com/spf13/viper" + "time" +) + +// The global, read-only config variable. +var cfg *Config + +type Config struct { + HTTPPort string + IpCheckerURL string + MaxProxyCheckers int + ProxyConnectTimeout time.Duration + ProxyPoolOurs []string + ProxyPoolThirdparty []string + ThirdpartyTestUrls []string + ThirdpartyBypassDomains []string +} + +func SetConfig(configFile string) (*Config, error) { + // Only allow the config to be set once. + if cfg != nil { + panic("Config has already been set!") + } + + viper.SetConfigFile(configFile) + viper.SetDefault("http_port", "5000") + viper.SetDefault("proxy_checkers", 50) + viper.SetDefault("proxy_connect_timeout", 10) + viper.SetDefault("proxy_pool_ours", make([]string, 0)) + viper.SetDefault("proxy_pool_thirdparty", make([]string, 0)) + viper.SetDefault("thirdparty_test_urls", make([]string, 0)) + viper.SetDefault("thirdparty_bypass_domains", make([]string, 0)) + + err := viper.ReadInConfig() + if err != nil { + return nil, err + } + + config := &Config{ + HTTPPort: viper.GetString("http_port"), + IpCheckerURL: viper.GetString("ip_checker_url"), + MaxProxyCheckers: viper.GetInt("proxy_checkers"), + ProxyPoolOurs: viper.GetStringSlice("proxy_pool_ours"), + ProxyPoolThirdparty: viper.GetStringSlice("proxy_pool_thirdparty"), + ThirdpartyTestUrls: viper.GetStringSlice("thirdparty_test_urls"), + ThirdpartyBypassDomains: viper.GetStringSlice("thirdparty_bypass_domains"), + } + + if config.IpCheckerURL == "" { + return nil, errors.New("ip_checker_url is required") + } + + timeout := viper.GetInt("proxy_connect_timeout") + if timeout <= 0 { + return nil, errors.New("proxy_connect_timeout must be greater than 0") + } + config.ProxyConnectTimeout = time.Duration(timeout) * time.Second + + proxyPoolOursErr := validateProxies(config.ProxyPoolOurs) + if proxyPoolOursErr != nil { + return nil, proxyPoolOursErr + } + + proxyPoolThirdpartyErr := validateProxies(config.ProxyPoolThirdparty) + if proxyPoolThirdpartyErr != nil { + return nil, proxyPoolThirdpartyErr + } + + cfg = config + return config, nil +} + +func GetConfig() *Config { + if cfg == nil { + panic("Config has not been set!") + } + return cfg +} diff --git a/src/config/validate.go b/src/config/validate.go index 2c9ce50..01afbd9 100644 --- a/src/config/validate.go +++ b/src/config/validate.go @@ -6,10 +6,10 @@ import ( "strings" ) -func ValidateProxies(proxies []string) error { +func validateProxies(proxies []string) error { for _, proxy := range proxies { - if !strings.HasPrefix("http://", proxy) { - return errors.New(fmt.Sprintf(`proxy "%s" must start with http://`, proxy)) + if !strings.HasPrefix(proxy, "http://") { + return errors.New(fmt.Sprintf(`Proxy URLs must start with "http://" - "%s"`, proxy)) } } return nil diff --git a/src/go.mod b/src/go.mod index 50e1fae..588c36b 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,8 +3,29 @@ module main go 1.22.1 require ( - github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/viper v1.18.2 golang.org/x/sync v0.7.0 ) -require github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5 // indirect +require ( + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/src/go.sum b/src/go.sum index b895067..9fdec7c 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,8 +1,75 @@ -github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 h1:m62nsMU279qRD9PQSWD1l66kmkXzuYcnVJqL4XLeV2M= -github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= -github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= -github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5 h1:iGoePcl8bIDJxxRAL2Q4E4Rt35z5m917RJb8lAvdrQw= -github.com/elazarl/goproxy/ext v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= -github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= +github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/logging/logging.go b/src/logging/logging.go new file mode 100644 index 0000000..af79a8b --- /dev/null +++ b/src/logging/logging.go @@ -0,0 +1,27 @@ +package logging + +import ( + "github.com/sirupsen/logrus" +) + +var log *logrus.Logger + +func init() { + log = logrus.New() + + // Set log output format + customFormatter := new(logrus.TextFormatter) + customFormatter.TimestampFormat = "2006-01-02 15:04:05" + customFormatter.FullTimestamp = true + log.SetFormatter(customFormatter) +} + +// InitLogger initializes the global logger with the specified log level +func InitLogger(logLevel logrus.Level) { + log.SetLevel(logLevel) +} + +// GetLogger returns the global logger instance +func GetLogger() *logrus.Logger { + return log +} diff --git a/src/proxy-loadbalancer.go b/src/proxy-loadbalancer.go index d054b92..1f17da5 100644 --- a/src/proxy-loadbalancer.go +++ b/src/proxy-loadbalancer.go @@ -3,10 +3,13 @@ package main import ( "flag" "fmt" - "log" + "github.com/sirupsen/logrus" + "main/config" + "main/logging" "main/proxy" "net/http" "os" + "path/filepath" "runtime/debug" ) @@ -48,13 +51,44 @@ func main() { os.Exit(0) } + if cliArgs.debug { + logging.InitLogger(logrus.DebugLevel) + } else { + logging.InitLogger(logrus.InfoLevel) + } + log := logging.GetLogger() + log.Debugln("Initializing...") + + if cliArgs.configFile == "" { + exePath, err := os.Executable() + if err != nil { + panic(err) + } + exeDir := filepath.Dir(exePath) + + if _, err := os.Stat(filepath.Join(exeDir, "config.yml")); err == nil { + if _, err := os.Stat(filepath.Join(exeDir, "config.yaml")); err == nil { + log.Fatalln("Both config.yml and config.yaml exist in the executable directory. Please specify one with the --config flag.") + } + cliArgs.configFile = filepath.Join(exeDir, "config.yml") + } else if _, err := os.Stat(filepath.Join(exeDir, "config.yaml")); err == nil { + cliArgs.configFile = filepath.Join(exeDir, "config.yaml") + } else { + log.Fatalln("No config file found in the executable directory. Please provide one with the --config flag.") + } + } + configData, err := config.SetConfig(cliArgs.configFile) + if err != nil { + log.Fatalf(`Failed to load config: %s`, err) + } + proxyCluster := proxy.NewForwardProxyCluster() go proxyCluster.ValidateProxiesThread() proxyCluster.BalancerOnline.Wait() go func() { - log.Fatal(http.ListenAndServe(":5000", proxyCluster)) + log.Fatal(http.ListenAndServe(":"+configData.HTTPPort, proxyCluster)) }() - fmt.Println("Server started!") + log.Infof("-> Server started on 0.0.0.0:%s and accepting requests <-", configData.HTTPPort) select {} } diff --git a/src/proxy/checkProxy.go b/src/proxy/checkProxy.go index e754a55..de5367d 100644 --- a/src/proxy/checkProxy.go +++ b/src/proxy/checkProxy.go @@ -3,27 +3,26 @@ package proxy import ( "fmt" "io" + "main/config" "net/http" "net/url" - "time" ) -func sendRequestThroughProxy(proxyUrl string, targetURL string) (string, error) { - parsedProxyUrl, err := url.Parse(proxyUrl) +func sendRequestThroughProxy(pxy string, targetURL string) (string, error) { + proxyUser, proxyPass, _, parsedProxyUrl, err := splitProxyURL(pxy) if err != nil { return "", err } - if IsSmartproxy(proxyUrl) { - // Set the username and password for proxy authentication if Smartproxy - parsedProxyUrl.User = url.UserPassword(smartproxyUsername, smartproxyPassword) + if proxyUser != "" && proxyPass != "" { + parsedProxyUrl.User = url.UserPassword(proxyUser, proxyPass) } transport := &http.Transport{ Proxy: http.ProxyURL(parsedProxyUrl), } client := &http.Client{ Transport: transport, - Timeout: time.Second * 10, + Timeout: config.GetConfig().ProxyConnectTimeout, } response, err := client.Get(targetURL) diff --git a/src/proxy/handleConnect.go b/src/proxy/handleConnect.go index af151b0..6398a6c 100644 --- a/src/proxy/handleConnect.go +++ b/src/proxy/handleConnect.go @@ -2,28 +2,95 @@ package proxy import ( "bufio" + "encoding/base64" + "errors" "fmt" "io" - "log" + "main/config" "net" "net/http" "net/url" - "strings" - "sync/atomic" + "slices" ) -func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.Request) { - proxyURLParsed, _ := url.Parse(p.getProxy()) - proxyURLParsed.Scheme = "http" +func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter, req *http.Request) (string, string, string, string, *url.URL, error) { + if p.BalancerOnline.GetCount() != 0 { + errStr := "balancer is not ready" + http.Error(w, errStr, http.StatusServiceUnavailable) + return "", "", "", "", nil, errors.New(errStr) + } + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.ourOnlineProxies) == 0 && len(p.thirdpartyOnlineProxies) == 0 { + errStr := "no valid backends" + http.Error(w, errStr, http.StatusServiceUnavailable) + return "", "", "", "", nil, errors.New(errStr) + } + + headerIncludeBrokenThirdparty := req.Header.Get("Thirdparty-Include-Broken") + req.Header.Del("Thirdparty-Include-Broken") + headerBypassThirdparty := req.Header.Get("Thirdparty-Bypass") + req.Header.Del("Thirdparty-Bypass") + if headerBypassThirdparty != "" && headerIncludeBrokenThirdparty != "" { + errStr := "duplicate options headers detected, rejecting request" + http.Error(w, errStr, http.StatusBadRequest) + return "", "", "", "", nil, errors.New(errStr) + } + + var selectedProxy string + if slices.Contains(config.GetConfig().ThirdpartyBypassDomains, req.URL.Hostname()) { + selectedProxy = p.getProxyFromOurs() + } else { + if headerIncludeBrokenThirdparty != "" { + selectedProxy = p.getProxyFromAllWithBroken() + } else if headerBypassThirdparty != "" { + selectedProxy = p.getProxyFromOurs() + } else { + selectedProxy = p.getProxyFromAll() + } + } + if selectedProxy == "" { + panic("selected proxy was empty!") + } + + proxyUser, proxyPass, proxyHost, parsedProxyUrl, err := splitProxyURL(selectedProxy) + if err != nil { + errStr := "failed to parse downstream proxy assignment" + http.Error(w, errStr, http.StatusBadRequest) + return "", "", "", "", nil, errors.New(fmt.Sprintf(`%s: %s`, errStr, err.Error())) + } + + return selectedProxy, proxyUser, proxyPass, proxyHost, parsedProxyUrl, nil + +} + +func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.Request) { + _, proxyUser, proxyPass, proxyHost, parsedProxyUrl, err := p.validateRequestAndGetProxy(w, req) + if err != nil { + // Error has already been handled, just log and return. + log.Errorf(`Failed to validate and get proxy: "%s"`, err) + return + } + remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr) + defer log.Debugf(`%s -> %s -> %s -- HTTP`, remoteAddr, proxyHost, req.Host) + + parsedProxyUrl.Scheme = "http" + if proxyUser != "" && proxyPass != "" { + parsedProxyUrl.User = url.UserPassword(proxyUser, proxyPass) + } client := &http.Client{ Transport: &http.Transport{ - Proxy: http.ProxyURL(proxyURLParsed), + Proxy: http.ProxyURL(parsedProxyUrl), }, + Timeout: config.GetConfig().ProxyConnectTimeout, } + proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + log.Errorf(`Failed to make %s request to "%s": "%s"`, req.Method, req.URL.String(), err) + http.Error(w, "failed to make request to downstream", http.StatusInternalServerError) return } @@ -32,7 +99,8 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http. resp, err := client.Do(proxyReq) if err != nil { - http.Error(w, err.Error(), http.StatusServiceUnavailable) + log.Errorf(`Failed to execute %s request to "%s": "%s"`, req.Method, req.URL.String(), err) + http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable) return } defer resp.Body.Close() @@ -42,49 +110,72 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http. io.Copy(w, resp.Body) } -func (p *ForwardProxyCluster) proxyHTTPSConnect(w http.ResponseWriter, req *http.Request) { - log.Printf("CONNECT requested to %v (from %v)", req.Host, req.RemoteAddr) - - allValidProxies := append(p.ourOnlineProxies, p.smartproxyOnlineProxies...) - currentProxy := atomic.LoadInt32(&p.CurrentProxy) - downstreamProxy := allValidProxies[currentProxy] - downstreamProxy = strings.Replace(downstreamProxy, "http://", "", -1) - downstreamProxy = strings.Replace(downstreamProxy, "https://", "", -1) - newCurrentProxy := (currentProxy + 1) % int32(len(testProxies)) - atomic.StoreInt32(&p.CurrentProxy, newCurrentProxy) +func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http.Request) { + _, proxyUser, proxyPass, proxyHost, _, err := p.validateRequestAndGetProxy(w, req) + if err != nil { + // Error has already been handled, just log and return. + log.Errorf(`Failed to validate and get proxy: "%s"`, err) + return + } + remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr) + targetHost, _, _ := net.SplitHostPort(req.Host) + defer log.Debugf(`%s -> %s -> %s -- CONNECT`, remoteAddr, proxyHost, targetHost) // Connect to the downstream proxy server instead of the target host - proxyConn, err := net.Dial("tcp", downstreamProxy) + proxyConn, err := net.DialTimeout("tcp", proxyHost, config.GetConfig().ProxyConnectTimeout) if err != nil { - log.Println("failed to dial to proxy", downstreamProxy, err) - http.Error(w, err.Error(), http.StatusServiceUnavailable) + log.Errorf(`Failed to dial proxy %s - %s`, proxyHost, err) + http.Error(w, "failed to make request to downstream", http.StatusServiceUnavailable) return } + // Proxy authentication + auth := fmt.Sprintf("%s:%s", proxyUser, proxyPass) + encodedAuth := base64.StdEncoding.EncodeToString([]byte(auth)) + authHeader := "Proxy-Authorization: Basic " + encodedAuth + // Send a new CONNECT request to the downstream proxy - _, err = fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", req.Host, req.Host) + _, err = fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n%s\r\n\r\n", req.Host, req.Host, authHeader) if err != nil { return } resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req) if err != nil || resp.StatusCode != 200 { - log.Println("failed to CONNECT to target", req.Host) - http.Error(w, err.Error(), http.StatusServiceUnavailable) + var errStr string + if err != nil { + // `err` may be nil + errStr = err.Error() + } + statusCode := -1 + if resp != nil { + statusCode = resp.StatusCode + } + log.Errorf(`Failed to CONNECT to %s using proxy %s. Status code : %d - "%s"`, req.Host, proxyHost, statusCode, errStr) + + // Return the original status code. + returnStatusCode := http.StatusServiceUnavailable + if statusCode != -1 { + returnStatusCode = statusCode + } + http.Error(w, "failed to execute request to downstream", returnStatusCode) return } w.WriteHeader(http.StatusOK) hj, ok := w.(http.Hijacker) if !ok { - log.Fatal("http server doesn't support hijacking connection") + log.Errorf(`Failed to forward connection to %s using proxy %s`, req.Host, proxyHost) + http.Error(w, "failed to forward connection to downstream", http.StatusServiceUnavailable) + return } clientConn, _, err := hj.Hijack() if err != nil { - log.Fatal("http hijacking failed") + log.Errorf(`Failed to execute connection forwarding to %s using proxy %s`, req.Host, proxyHost) + http.Error(w, "failed to execute connection forwarding to downstream", http.StatusServiceUnavailable) + return } - log.Println("tunnel established") go tunnelConn(proxyConn, clientConn) go tunnelConn(clientConn, proxyConn) } diff --git a/src/proxy/proxy.go b/src/proxy/proxy.go index 3b01b80..53a6f3f 100644 --- a/src/proxy/proxy.go +++ b/src/proxy/proxy.go @@ -1,35 +1,67 @@ package proxy import ( + "github.com/sirupsen/logrus" + "main/logging" + "slices" "sync" "sync/atomic" ) type ForwardProxyCluster struct { - // TODO: mutex rwlock - ourOnlineProxies []string - smartproxyOnlineProxies []string - smartproxyBrokenProxies []string - ipAddresses []string - BalancerOnline sync.WaitGroup - CurrentProxy int32 + mu sync.RWMutex + ourOnlineProxies []string + thirdpartyOnlineProxies []string + thirdpartyBrokenProxies []string + ipAddresses []string + BalancerOnline WaitGroupCountable + currentProxyAll int32 + currentProxyOurs int32 + currentProxyAllWithBroken int32 } -// TODO: move all smartproxy things to "thirdparty" +var log *logrus.Logger + +func init() { + log = logging.GetLogger() +} func NewForwardProxyCluster() *ForwardProxyCluster { p := &ForwardProxyCluster{} - atomic.StoreInt32(&p.CurrentProxy, 0) + atomic.StoreInt32(&p.currentProxyAll, 0) + atomic.StoreInt32(&p.currentProxyOurs, 0) + atomic.StoreInt32(&p.currentProxyAllWithBroken, 0) p.BalancerOnline.Add(1) return p } -func (p *ForwardProxyCluster) getProxy() string { +func (p *ForwardProxyCluster) cycleProxy(validProxies []string, currentProxy *int32) string { // Just round robin - allValidProxies := append(p.ourOnlineProxies, p.smartproxyOnlineProxies...) - currentProxy := atomic.LoadInt32(&p.CurrentProxy) - downstreamProxy := allValidProxies[currentProxy] - newCurrentProxy := (currentProxy + 1) % int32(len(testProxies)) - atomic.StoreInt32(&p.CurrentProxy, newCurrentProxy) + currProxy := atomic.LoadInt32(currentProxy) + downstreamProxy := validProxies[currProxy] + newCurrentProxy := (currProxy + 1) % int32(len(validProxies)) + atomic.StoreInt32(currentProxy, newCurrentProxy) return downstreamProxy } + +func (p *ForwardProxyCluster) getProxyFromAll() string { + p.mu.RLock() + defer p.mu.RUnlock() + validProxies := removeDuplicates(append(p.ourOnlineProxies, p.thirdpartyOnlineProxies...)) + return p.cycleProxy(validProxies, &p.currentProxyAll) +} + +func (p *ForwardProxyCluster) getProxyFromOurs() string { + p.mu.RLock() + defer p.mu.RUnlock() + validProxies := p.ourOnlineProxies + return p.cycleProxy(validProxies, &p.currentProxyOurs) +} + +func (p *ForwardProxyCluster) getProxyFromAllWithBroken() string { + p.mu.RLock() + defer p.mu.RUnlock() + validProxies := removeDuplicates(slices.Concat(p.ourOnlineProxies, p.thirdpartyBrokenProxies, p.thirdpartyOnlineProxies)) + return p.cycleProxy(validProxies, &p.currentProxyAllWithBroken) + +} diff --git a/src/proxy/serve.go b/src/proxy/serve.go index 44e49a0..e0c8945 100644 --- a/src/proxy/serve.go +++ b/src/proxy/serve.go @@ -2,20 +2,19 @@ package proxy import ( "fmt" - "log" "net/http" ) func (p *ForwardProxyCluster) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodConnect { // HTTPS - p.proxyHTTPSConnect(w, req) + p.proxyHttpsConnect(w, req) } else { // HTTP if req.URL.Scheme != "http" { msg := fmt.Sprintf(`unsupported protocal "%s"`, req.URL.Scheme) + log.Errorf(msg) http.Error(w, msg, http.StatusBadRequest) - log.Println(msg) return } p.proxyHttpConnect(w, req) diff --git a/src/proxy/string.go b/src/proxy/string.go index ca161ed..6154b90 100644 --- a/src/proxy/string.go +++ b/src/proxy/string.go @@ -1,14 +1,36 @@ package proxy import ( + "main/config" "net/url" + "slices" "strings" ) -func IsSmartproxy(proxyUrl string) bool { - parsedProxyUrl, err := url.Parse(proxyUrl) +func splitProxyURL(proxyURL string) (string, string, string, *url.URL, error) { + u, err := url.Parse(proxyURL) if err != nil { - panic(err) + return "", "", "", nil, err } - return strings.Split(parsedProxyUrl.Host, ":")[0] == "dc.smartproxy.com" + + var username, password string + if u.User != nil { + username = u.User.Username() + password, _ = u.User.Password() + } + + host := u.Host + + return username, password, host, u, nil +} + +func isThirdparty(proxyUrl string) bool { + return slices.Contains(config.GetConfig().ProxyPoolThirdparty, proxyUrl) +} + +func stripHTTP(url string) string { + var newStr string + newStr = strings.Replace(url, "http://", "", -1) + newStr = strings.Replace(newStr, "https://", "", -1) + return newStr } diff --git a/src/proxy/validate.go b/src/proxy/threads.go similarity index 54% rename from src/proxy/validate.go rename to src/proxy/threads.go index 5815594..6685d76 100644 --- a/src/proxy/validate.go +++ b/src/proxy/threads.go @@ -2,9 +2,8 @@ package proxy import ( "context" - "fmt" "golang.org/x/sync/semaphore" - "log" + "main/config" "math/rand" "slices" "sync" @@ -12,19 +11,17 @@ import ( ) func (p *ForwardProxyCluster) ValidateProxiesThread() { - log.Println("Doing initial backend check, please wait...") + log.Infoln("Doing initial backend check, please wait...") started := false - - // TODO: config value - var sem = semaphore.NewWeighted(int64(50)) + var sem = semaphore.NewWeighted(int64(config.GetConfig().MaxProxyCheckers)) ctx := context.TODO() for { // TODO: need to have these be temp vars and then copy them over when finished - allProxies := removeDuplicates(append(testProxies, testSmartproxyPool...)) + allProxies := removeDuplicates(append(config.GetConfig().ProxyPoolOurs, config.GetConfig().ProxyPoolThirdparty...)) p.ourOnlineProxies = make([]string, 0) - p.smartproxyOnlineProxies = make([]string, 0) - p.smartproxyBrokenProxies = make([]string, 0) + p.thirdpartyOnlineProxies = make([]string, 0) + p.thirdpartyBrokenProxies = make([]string, 0) p.ipAddresses = make([]string, 0) var wg sync.WaitGroup @@ -35,62 +32,69 @@ func (p *ForwardProxyCluster) ValidateProxiesThread() { defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { - fmt.Printf("Failed to acquire semaphore: %v\n", err) + log.Errorf("Validate - failed to acquire semaphore: %v\n", err) return } defer sem.Release(1) + _, _, proxyHost, _, err := splitProxyURL(pxy) + if err != nil { + log.Errorf(`Invalid proxy "%s"`, pxy) + return + } + // Test the proxy. - ipAddr, testErr := sendRequestThroughProxy(pxy, testTargetUrl) + ipAddr, testErr := sendRequestThroughProxy(pxy, config.GetConfig().IpCheckerURL) if testErr != nil { - fmt.Printf("Proxy %s failed: %s\n", pxy, testErr) + log.Warnf("Validate - proxy %s failed: %s", proxyHost, testErr) return } if slices.Contains(p.ipAddresses, ipAddr) { - fmt.Printf("Duplicate IP Address %s for proxy %s\n", ipAddr, pxy) + log.Warnf("Validate - duplicate IP Address %s for proxy %s", ipAddr, proxyHost) return } p.ipAddresses = append(p.ipAddresses, ipAddr) // Sort the proxy into the right groups. - if IsSmartproxy(pxy) { - p.smartproxyOnlineProxies = append(p.smartproxyOnlineProxies, pxy) - for _, d := range testSmartproxyBV3HIFix { + if isThirdparty(pxy) { + p.mu.Lock() + p.thirdpartyOnlineProxies = append(p.thirdpartyOnlineProxies, pxy) + p.mu.Unlock() + + for _, d := range config.GetConfig().ThirdpartyTestUrls { _, bv3hiErr := sendRequestThroughProxy(pxy, d) if bv3hiErr != nil { - fmt.Printf("Smartproxy %s failed: %s\n", pxy, bv3hiErr) - p.smartproxyBrokenProxies = append(p.smartproxyBrokenProxies, pxy) + log.Debugf("Validate - Third-party %s failed: %s\n", proxyHost, bv3hiErr) + p.thirdpartyBrokenProxies = append(p.thirdpartyBrokenProxies, pxy) } } } else { + p.mu.Lock() p.ourOnlineProxies = append(p.ourOnlineProxies, pxy) + p.mu.Unlock() } }(pxy) } wg.Wait() if !started { + p.mu.Lock() p.ourOnlineProxies = shuffle(p.ourOnlineProxies) - p.smartproxyOnlineProxies = shuffle(p.smartproxyOnlineProxies) + p.thirdpartyOnlineProxies = shuffle(p.thirdpartyOnlineProxies) + p.mu.Unlock() started = true p.BalancerOnline.Done() } - log.Printf("Our Endpoints Online: %d, Smartproxy Endpoints Online: %d, Smartproxy Broken Backends: %d, Total Online: %d\n", - len(p.ourOnlineProxies), len(p.smartproxyOnlineProxies), len(p.smartproxyBrokenProxies), len(p.ourOnlineProxies)+(len(p.smartproxyOnlineProxies)-len(p.smartproxyBrokenProxies))) + p.mu.RLock() + log.Infof("Our Endpoints Online: %d, Third-Party Endpoints Online: %d, Third-Party Broken Endpoints: %d, Total Valid: %d\n", + len(p.ourOnlineProxies), len(p.thirdpartyOnlineProxies), len(p.thirdpartyBrokenProxies), len(p.ourOnlineProxies)+(len(p.thirdpartyOnlineProxies)-len(p.thirdpartyBrokenProxies))) + p.mu.RUnlock() time.Sleep(60 * time.Second) } } -func getKeysFromMap(m map[string]string) []string { - keys := make([]string, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - return keys -} - func shuffle(vals []string) []string { r := rand.New(rand.NewSource(time.Now().Unix())) ret := make([]string, len(vals)) diff --git a/src/proxy/waitgroup.go b/src/proxy/waitgroup.go new file mode 100644 index 0000000..3667bcb --- /dev/null +++ b/src/proxy/waitgroup.go @@ -0,0 +1,25 @@ +package proxy + +import ( + "sync" + "sync/atomic" +) + +type WaitGroupCountable struct { + sync.WaitGroup + count int64 +} + +func (wg *WaitGroupCountable) Add(delta int) { + atomic.AddInt64(&wg.count, int64(delta)) + wg.WaitGroup.Add(delta) +} + +func (wg *WaitGroupCountable) Done() { + atomic.AddInt64(&wg.count, -1) + wg.WaitGroup.Done() +} + +func (wg *WaitGroupCountable) GetCount() int { + return int(atomic.LoadInt64(&wg.count)) +}