mirror of https://github.com/slackhq/nebula.git
Add more metrics (#450)
* Add more metrics This change adds the following counter metrics: Metrics to track packets dropped at the firewall: firewall.dropped.local_ip firewall.dropped.remote_ip firewall.dropped.no_rule Metrics to track handshakes attempts that have been initiated and ones that have timed out (ones that have completed are tracked by the existing "handshakes" histogram). handshake_manager.initiated handshake_manager.timed_out Metrics to track when cached_packets are dropped because we run out of buffer space, and how many are sent once the handshake completes. hostinfo.cached_packets.dropped hostinfo.cached_packets.sent This change also notes how many cached packets we have when we log the final "Handshake received" message for either stage1 for stage2. * separate incoming/outgoing metrics * remove "allowed" firewall metrics We don't need this on the hotpath, they aren't worh it. * don't need pointers here
This commit is contained in:
parent
db23fdf9bc
commit
44cb697552
40
firewall.go
40
firewall.go
|
@ -68,9 +68,18 @@ type Firewall struct {
|
||||||
rules string
|
rules string
|
||||||
rulesVersion uint16
|
rulesVersion uint16
|
||||||
|
|
||||||
trackTCPRTT bool
|
trackTCPRTT bool
|
||||||
metricTCPRTT metrics.Histogram
|
metricTCPRTT metrics.Histogram
|
||||||
l *logrus.Logger
|
incomingMetrics firewallMetrics
|
||||||
|
outgoingMetrics firewallMetrics
|
||||||
|
|
||||||
|
l *logrus.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type firewallMetrics struct {
|
||||||
|
droppedLocalIP metrics.Counter
|
||||||
|
droppedRemoteIP metrics.Counter
|
||||||
|
droppedNoRule metrics.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
type FirewallConntrack struct {
|
type FirewallConntrack struct {
|
||||||
|
@ -195,8 +204,19 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D
|
||||||
UDPTimeout: UDPTimeout,
|
UDPTimeout: UDPTimeout,
|
||||||
DefaultTimeout: defaultTimeout,
|
DefaultTimeout: defaultTimeout,
|
||||||
localIps: localIps,
|
localIps: localIps,
|
||||||
metricTCPRTT: metrics.GetOrRegisterHistogram("network.tcp.rtt", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
|
||||||
l: l,
|
l: l,
|
||||||
|
|
||||||
|
metricTCPRTT: metrics.GetOrRegisterHistogram("network.tcp.rtt", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
||||||
|
incomingMetrics: firewallMetrics{
|
||||||
|
droppedLocalIP: metrics.GetOrRegisterCounter("firewall.incoming.dropped.local_ip", nil),
|
||||||
|
droppedRemoteIP: metrics.GetOrRegisterCounter("firewall.incoming.dropped.remote_ip", nil),
|
||||||
|
droppedNoRule: metrics.GetOrRegisterCounter("firewall.incoming.dropped.no_rule", nil),
|
||||||
|
},
|
||||||
|
outgoingMetrics: firewallMetrics{
|
||||||
|
droppedLocalIP: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.local_ip", nil),
|
||||||
|
droppedRemoteIP: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.remote_ip", nil),
|
||||||
|
droppedNoRule: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.no_rule", nil),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,17 +405,20 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host
|
||||||
// Make sure remote address matches nebula certificate
|
// Make sure remote address matches nebula certificate
|
||||||
if remoteCidr := h.remoteCidr; remoteCidr != nil {
|
if remoteCidr := h.remoteCidr; remoteCidr != nil {
|
||||||
if remoteCidr.Contains(fp.RemoteIP) == nil {
|
if remoteCidr.Contains(fp.RemoteIP) == nil {
|
||||||
|
f.metrics(incoming).droppedRemoteIP.Inc(1)
|
||||||
return ErrInvalidRemoteIP
|
return ErrInvalidRemoteIP
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Simple case: Certificate has one IP and no subnets
|
// Simple case: Certificate has one IP and no subnets
|
||||||
if fp.RemoteIP != h.hostId {
|
if fp.RemoteIP != h.hostId {
|
||||||
|
f.metrics(incoming).droppedRemoteIP.Inc(1)
|
||||||
return ErrInvalidRemoteIP
|
return ErrInvalidRemoteIP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we are supposed to be handling this local ip address
|
// Make sure we are supposed to be handling this local ip address
|
||||||
if f.localIps.Contains(fp.LocalIP) == nil {
|
if f.localIps.Contains(fp.LocalIP) == nil {
|
||||||
|
f.metrics(incoming).droppedLocalIP.Inc(1)
|
||||||
return ErrInvalidLocalIP
|
return ErrInvalidLocalIP
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,6 +429,7 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host
|
||||||
|
|
||||||
// We now know which firewall table to check against
|
// We now know which firewall table to check against
|
||||||
if !table.match(fp, incoming, h.ConnectionState.peerCert, caPool) {
|
if !table.match(fp, incoming, h.ConnectionState.peerCert, caPool) {
|
||||||
|
f.metrics(incoming).droppedNoRule.Inc(1)
|
||||||
return ErrNoMatchingRule
|
return ErrNoMatchingRule
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,6 +439,14 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Firewall) metrics(incoming bool) firewallMetrics {
|
||||||
|
if incoming {
|
||||||
|
return f.incomingMetrics
|
||||||
|
} else {
|
||||||
|
return f.outgoingMetrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Destroy cleans up any known cyclical references so the object can be free'd my GC. This should be called if a new
|
// Destroy cleans up any known cyclical references so the object can be free'd my GC. This should be called if a new
|
||||||
// firewall object is created
|
// firewall object is created
|
||||||
func (f *Firewall) Destroy() {
|
func (f *Firewall) Destroy() {
|
||||||
|
|
|
@ -268,10 +268,11 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, packet []byte, h *Header) {
|
||||||
WithField("fingerprint", fingerprint).
|
WithField("fingerprint", fingerprint).
|
||||||
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
|
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
|
||||||
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
|
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
|
||||||
|
WithField("sentCachedPackets", len(hostinfo.packetStore)).
|
||||||
Info("Handshake message sent")
|
Info("Handshake message sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
hostinfo.handshakeComplete(f.l)
|
hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -391,6 +392,7 @@ func ixHandshakeStage2(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [
|
||||||
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
|
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
|
||||||
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
|
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
|
||||||
WithField("durationNs", duration).
|
WithField("durationNs", duration).
|
||||||
|
WithField("sentCachedPackets", len(hostinfo.packetStore)).
|
||||||
Info("Handshake message received")
|
Info("Handshake message received")
|
||||||
|
|
||||||
hostinfo.remoteIndexId = hs.Details.ResponderIndex
|
hostinfo.remoteIndexId = hs.Details.ResponderIndex
|
||||||
|
@ -410,7 +412,7 @@ func ixHandshakeStage2(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [
|
||||||
// Complete our handshake and update metrics, this will replace any existing tunnels for this vpnIp
|
// Complete our handshake and update metrics, this will replace any existing tunnels for this vpnIp
|
||||||
//TODO: Complete here does not do a race avoidance, it will just take the new tunnel. Is this ok?
|
//TODO: Complete here does not do a race avoidance, it will just take the new tunnel. Is this ok?
|
||||||
f.handshakeManager.Complete(hostinfo, f)
|
f.handshakeManager.Complete(hostinfo, f)
|
||||||
hostinfo.handshakeComplete(f.l)
|
hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics)
|
||||||
f.metricHandshakes.Update(duration)
|
f.metricHandshakes.Update(duration)
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/rcrowley/go-metrics"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,6 +42,8 @@ type HandshakeManager struct {
|
||||||
config HandshakeConfig
|
config HandshakeConfig
|
||||||
OutboundHandshakeTimer *SystemTimerWheel
|
OutboundHandshakeTimer *SystemTimerWheel
|
||||||
messageMetrics *MessageMetrics
|
messageMetrics *MessageMetrics
|
||||||
|
metricInitiated metrics.Counter
|
||||||
|
metricTimedOut metrics.Counter
|
||||||
l *logrus.Logger
|
l *logrus.Logger
|
||||||
|
|
||||||
// can be used to trigger outbound handshake for the given vpnIP
|
// can be used to trigger outbound handshake for the given vpnIP
|
||||||
|
@ -57,6 +60,8 @@ func NewHandshakeManager(l *logrus.Logger, tunCidr *net.IPNet, preferredRanges [
|
||||||
trigger: make(chan uint32, config.triggerBuffer),
|
trigger: make(chan uint32, config.triggerBuffer),
|
||||||
OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||||
messageMetrics: config.messageMetrics,
|
messageMetrics: config.messageMetrics,
|
||||||
|
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
|
||||||
|
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
|
||||||
l: l,
|
l: l,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,7 +122,7 @@ func (c *HandshakeManager) handleOutbound(vpnIP uint32, f EncWriter, lighthouseT
|
||||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||||
WithField("durationNs", time.Since(hostinfo.handshakeStart).Nanoseconds()).
|
WithField("durationNs", time.Since(hostinfo.handshakeStart).Nanoseconds()).
|
||||||
Info("Handshake timed out")
|
Info("Handshake timed out")
|
||||||
//TODO: emit metrics
|
c.metricTimedOut.Inc(1)
|
||||||
c.pendingHostMap.DeleteHostInfo(hostinfo)
|
c.pendingHostMap.DeleteHostInfo(hostinfo)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -181,6 +186,7 @@ func (c *HandshakeManager) AddVpnIP(vpnIP uint32) *HostInfo {
|
||||||
// main receive thread for very long by waiting to add items to the pending map
|
// main receive thread for very long by waiting to add items to the pending map
|
||||||
//TODO: what lock?
|
//TODO: what lock?
|
||||||
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval)
|
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval)
|
||||||
|
c.metricInitiated.Inc(1)
|
||||||
|
|
||||||
return hostinfo
|
return hostinfo
|
||||||
}
|
}
|
||||||
|
|
11
hostmap.go
11
hostmap.go
|
@ -77,6 +77,11 @@ type cachedPacket struct {
|
||||||
|
|
||||||
type packetCallback func(t NebulaMessageType, st NebulaMessageSubType, h *HostInfo, p, nb, out []byte)
|
type packetCallback func(t NebulaMessageType, st NebulaMessageSubType, h *HostInfo, p, nb, out []byte)
|
||||||
|
|
||||||
|
type cachedPacketMetrics struct {
|
||||||
|
sent metrics.Counter
|
||||||
|
dropped metrics.Counter
|
||||||
|
}
|
||||||
|
|
||||||
func NewHostMap(l *logrus.Logger, name string, vpnCIDR *net.IPNet, preferredRanges []*net.IPNet) *HostMap {
|
func NewHostMap(l *logrus.Logger, name string, vpnCIDR *net.IPNet, preferredRanges []*net.IPNet) *HostMap {
|
||||||
h := map[uint32]*HostInfo{}
|
h := map[uint32]*HostInfo{}
|
||||||
i := map[uint32]*HostInfo{}
|
i := map[uint32]*HostInfo{}
|
||||||
|
@ -435,7 +440,7 @@ func (i *HostInfo) TryPromoteBest(preferredRanges []*net.IPNet, ifce *Interface)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaMessageSubType, packet []byte, f packetCallback) {
|
func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaMessageSubType, packet []byte, f packetCallback, m *cachedPacketMetrics) {
|
||||||
//TODO: return the error so we can log with more context
|
//TODO: return the error so we can log with more context
|
||||||
if len(i.packetStore) < 100 {
|
if len(i.packetStore) < 100 {
|
||||||
tempPacket := make([]byte, len(packet))
|
tempPacket := make([]byte, len(packet))
|
||||||
|
@ -450,6 +455,7 @@ func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaM
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if l.Level >= logrus.DebugLevel {
|
} else if l.Level >= logrus.DebugLevel {
|
||||||
|
m.dropped.Inc(1)
|
||||||
i.logger(l).
|
i.logger(l).
|
||||||
WithField("length", len(i.packetStore)).
|
WithField("length", len(i.packetStore)).
|
||||||
WithField("stored", false).
|
WithField("stored", false).
|
||||||
|
@ -458,7 +464,7 @@ func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaM
|
||||||
}
|
}
|
||||||
|
|
||||||
// handshakeComplete will set the connection as ready to communicate, as well as flush any stored packets
|
// handshakeComplete will set the connection as ready to communicate, as well as flush any stored packets
|
||||||
func (i *HostInfo) handshakeComplete(l *logrus.Logger) {
|
func (i *HostInfo) handshakeComplete(l *logrus.Logger, m *cachedPacketMetrics) {
|
||||||
//TODO: I'm not certain the distinction between handshake complete and ConnectionState being ready matters because:
|
//TODO: I'm not certain the distinction between handshake complete and ConnectionState being ready matters because:
|
||||||
//TODO: HandshakeComplete means send stored packets and ConnectionState.ready means we are ready to send
|
//TODO: HandshakeComplete means send stored packets and ConnectionState.ready means we are ready to send
|
||||||
//TODO: if the transition from HandhsakeComplete to ConnectionState.ready happens all within this function they are identical
|
//TODO: if the transition from HandhsakeComplete to ConnectionState.ready happens all within this function they are identical
|
||||||
|
@ -479,6 +485,7 @@ func (i *HostInfo) handshakeComplete(l *logrus.Logger) {
|
||||||
for _, cp := range i.packetStore {
|
for _, cp := range i.packetStore {
|
||||||
cp.callback(cp.messageType, cp.messageSubType, i, cp.packet, nb, out)
|
cp.callback(cp.messageType, cp.messageSubType, i, cp.packet, nb, out)
|
||||||
}
|
}
|
||||||
|
m.sent.Inc(int64(len(i.packetStore)))
|
||||||
}
|
}
|
||||||
|
|
||||||
i.remotes.ResetBlockedRemotes()
|
i.remotes.ResetBlockedRemotes()
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
|
||||||
// the packet queue.
|
// the packet queue.
|
||||||
ci.queueLock.Lock()
|
ci.queueLock.Lock()
|
||||||
if !ci.ready {
|
if !ci.ready {
|
||||||
hostinfo.cachePacket(f.l, message, 0, packet, f.sendMessageNow)
|
hostinfo.cachePacket(f.l, message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
|
||||||
ci.queueLock.Unlock()
|
ci.queueLock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ func (f *Interface) SendMessageToVpnIp(t NebulaMessageType, st NebulaMessageSubT
|
||||||
// the packet queue.
|
// the packet queue.
|
||||||
hostInfo.ConnectionState.queueLock.Lock()
|
hostInfo.ConnectionState.queueLock.Lock()
|
||||||
if !hostInfo.ConnectionState.ready {
|
if !hostInfo.ConnectionState.ready {
|
||||||
hostInfo.cachePacket(f.l, t, st, p, f.sendMessageToVpnIp)
|
hostInfo.cachePacket(f.l, t, st, p, f.sendMessageToVpnIp, f.cachedPacketMetrics)
|
||||||
hostInfo.ConnectionState.queueLock.Unlock()
|
hostInfo.ConnectionState.queueLock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
15
interface.go
15
interface.go
|
@ -77,9 +77,11 @@ type Interface struct {
|
||||||
writers []*udpConn
|
writers []*udpConn
|
||||||
readers []io.ReadWriteCloser
|
readers []io.ReadWriteCloser
|
||||||
|
|
||||||
metricHandshakes metrics.Histogram
|
metricHandshakes metrics.Histogram
|
||||||
messageMetrics *MessageMetrics
|
messageMetrics *MessageMetrics
|
||||||
l *logrus.Logger
|
cachedPacketMetrics *cachedPacketMetrics
|
||||||
|
|
||||||
|
l *logrus.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInterface(c *InterfaceConfig) (*Interface, error) {
|
func NewInterface(c *InterfaceConfig) (*Interface, error) {
|
||||||
|
@ -122,7 +124,12 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) {
|
||||||
|
|
||||||
metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
||||||
messageMetrics: c.MessageMetrics,
|
messageMetrics: c.MessageMetrics,
|
||||||
l: c.l,
|
cachedPacketMetrics: &cachedPacketMetrics{
|
||||||
|
sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
|
||||||
|
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
|
||||||
|
},
|
||||||
|
|
||||||
|
l: c.l,
|
||||||
}
|
}
|
||||||
|
|
||||||
ifce.connectionManager = newConnectionManager(c.l, ifce, c.checkInterval, c.pendingDeletionInterval)
|
ifce.connectionManager = newConnectionManager(c.l, ifce, c.checkInterval, c.pendingDeletionInterval)
|
||||||
|
|
Loading…
Reference in New Issue