fix exception from proxy server when background thread is updating proxies, return the proxied status code instead of handling it on the server,add more fields to request logging
This commit is contained in:
parent
8cddcbcb2c
commit
dd1a880e41
|
@ -6,7 +6,8 @@ This is a simple proxy load balancer that will route requests to a cluster of pr
|
|||
This makes it easy to connect your clients to a large number of proxy servers without worrying about implementing
|
||||
anything special clientside.
|
||||
|
||||
HTTPS proxy servers are not supported.
|
||||
- Downstream HTTPS proxy servers are not supported.
|
||||
- This proxy server will transparently forward HTTPS requests without terminating them, meaning a self-signed certificate is not required.
|
||||
|
||||
## Install
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"slices"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -18,9 +19,13 @@ var (
|
|||
HeaderThirdpartyBypass = "Thirdparty-Bypass"
|
||||
)
|
||||
|
||||
func logProxyRequest(remoteAddr string, proxyHost string, targetHost string, returnCode *int, proxyConnectMode string, elapsedMs int64) {
|
||||
log.Infof(`%s -> %s -> %s -> %d -- %s -- %d ms`, remoteAddr, proxyHost, targetHost, *returnCode, proxyConnectMode, elapsedMs)
|
||||
}
|
||||
|
||||
func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter, req *http.Request) (string, string, string, string, *url.URL, error) {
|
||||
if p.BalancerOnline.GetCount() != 0 {
|
||||
errStr := "balancer is not ready"
|
||||
errStr := "proxy is not ready"
|
||||
http.Error(w, errStr, http.StatusServiceUnavailable)
|
||||
return "", "", "", "", nil, errors.New(errStr)
|
||||
}
|
||||
|
@ -72,6 +77,7 @@ func (p *ForwardProxyCluster) validateRequestAndGetProxy(w http.ResponseWriter,
|
|||
}
|
||||
|
||||
func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.Request) {
|
||||
requestStartTime := time.Now()
|
||||
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
|
||||
_, proxyUser, proxyPass, proxyHost, parsedProxyUrl, err := p.validateRequestAndGetProxy(w, req)
|
||||
if err != nil {
|
||||
|
@ -79,7 +85,10 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
|
|||
log.Debugf(`%s -> %s -- HTTP -- Rejecting request: "%s"`, remoteAddr, proxyHost, err)
|
||||
return
|
||||
}
|
||||
defer log.Infof(`%s -> %s -> %s -- HTTP`, remoteAddr, proxyHost, req.Host)
|
||||
var returnCode *int
|
||||
returnCode = new(int)
|
||||
*returnCode = -1
|
||||
defer logProxyRequest(remoteAddr, proxyHost, req.Host, returnCode, "HTTP", time.Since(requestStartTime).Milliseconds())
|
||||
|
||||
parsedProxyUrl.Scheme = "http"
|
||||
if proxyUser != "" && proxyPass != "" {
|
||||
|
@ -109,6 +118,7 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
|
|||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
*returnCode = resp.StatusCode
|
||||
|
||||
copyHeader(w.Header(), resp.Header)
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
|
@ -116,6 +126,7 @@ func (p *ForwardProxyCluster) proxyHttpConnect(w http.ResponseWriter, req *http.
|
|||
}
|
||||
|
||||
func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http.Request) {
|
||||
requestStartTime := time.Now()
|
||||
remoteAddr, _, _ := net.SplitHostPort(req.RemoteAddr)
|
||||
targetHost, _, _ := net.SplitHostPort(req.Host)
|
||||
_, proxyUser, proxyPass, proxyHost, _, err := p.validateRequestAndGetProxy(w, req)
|
||||
|
@ -124,9 +135,12 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
|
|||
log.Debugf(`%s -> %s -- CONNECT -- Rejecting request: "%s"`, remoteAddr, proxyHost, err)
|
||||
return
|
||||
}
|
||||
defer log.Infof(`%s -> %s -> %s -- CONNECT`, remoteAddr, proxyHost, targetHost)
|
||||
var returnCode *int
|
||||
returnCode = new(int)
|
||||
*returnCode = -1
|
||||
defer logProxyRequest(remoteAddr, proxyHost, targetHost, returnCode, "CONNECT", time.Since(requestStartTime).Milliseconds())
|
||||
|
||||
// Connect to the downstream proxy server instead of the target host
|
||||
// Start a connection to the downstream proxy server.
|
||||
proxyConn, err := net.DialTimeout("tcp", proxyHost, config.GetConfig().ProxyConnectTimeout)
|
||||
if err != nil {
|
||||
log.Errorf(`Failed to dial proxy %s - %s`, proxyHost, err)
|
||||
|
@ -145,26 +159,14 @@ func (p *ForwardProxyCluster) proxyHttpsConnect(w http.ResponseWriter, req *http
|
|||
return
|
||||
}
|
||||
resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req)
|
||||
if err != nil || resp.StatusCode != 200 {
|
||||
var errStr string
|
||||
if err != nil {
|
||||
// `err` may be nil
|
||||
errStr = err.Error()
|
||||
}
|
||||
statusCode := -1
|
||||
if resp != nil {
|
||||
statusCode = resp.StatusCode
|
||||
}
|
||||
log.Errorf(`Failed to CONNECT to %s using proxy %s. Status code : %d - "%s"`, req.Host, proxyHost, statusCode, errStr)
|
||||
|
||||
// Return the original status code.
|
||||
returnStatusCode := http.StatusServiceUnavailable
|
||||
if statusCode != -1 {
|
||||
returnStatusCode = statusCode
|
||||
}
|
||||
http.Error(w, "failed to execute request to downstream", returnStatusCode)
|
||||
if resp == nil {
|
||||
log.Errorf(`Failed to CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
|
||||
http.Error(w, "failed to execute request to downstream", http.StatusServiceUnavailable)
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Warnf(`Error while performing CONNECT to %s using proxy %s: %s`, req.Host, proxyHost, err)
|
||||
}
|
||||
*returnCode = resp.StatusCode
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
hj, ok := w.(http.Hijacker)
|
||||
|
|
|
@ -37,10 +37,17 @@ func NewForwardProxyCluster() *ForwardProxyCluster {
|
|||
|
||||
func (p *ForwardProxyCluster) cycleProxy(validProxies []string, currentProxy *int32) string {
|
||||
// Just round robin
|
||||
currProxy := atomic.LoadInt32(currentProxy)
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
currProxy := int(atomic.LoadInt32(currentProxy))
|
||||
if currProxy > len(validProxies)-1 {
|
||||
// This might happen if the background thread is currently making changes to the list of proxies.
|
||||
// Just set it to the current length of the proxies array and move on.
|
||||
currProxy = len(validProxies) - 1
|
||||
}
|
||||
downstreamProxy := validProxies[currProxy]
|
||||
newCurrentProxy := (currProxy + 1) % int32(len(validProxies))
|
||||
atomic.StoreInt32(currentProxy, newCurrentProxy)
|
||||
newCurrentProxy := (currProxy + 1) % len(validProxies)
|
||||
atomic.StoreInt32(currentProxy, int32(newCurrentProxy))
|
||||
return downstreamProxy
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue