mirror of https://github.com/slackhq/nebula.git
simplify
This commit is contained in:
parent
bcaefce4ac
commit
6f27f46965
|
@ -58,11 +58,11 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
|
|||
nc := &connectionManager{
|
||||
hostMap: intf.hostMap,
|
||||
in: make(map[uint32]struct{}),
|
||||
inLock: newSyncRWMutex(mutexKey{Type: mutexKeyTypeConnectionManagerIn}),
|
||||
inLock: newSyncRWMutex("connection-manager-in"),
|
||||
out: make(map[uint32]struct{}),
|
||||
outLock: newSyncRWMutex(mutexKey{Type: mutexKeyTypeConnectionManagerOut}),
|
||||
outLock: newSyncRWMutex("connection-manager-out"),
|
||||
relayUsed: make(map[uint32]struct{}),
|
||||
relayUsedLock: newSyncRWMutex(mutexKey{Type: mutexKeyTypeConnectionManagerRelayUsed}),
|
||||
relayUsedLock: newSyncRWMutex("connection-manager-relay-used"),
|
||||
trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
|
||||
intf: intf,
|
||||
pendingDeletion: make(map[uint32]struct{}),
|
||||
|
|
|
@ -70,7 +70,7 @@ func NewConnectionState(l *logrus.Logger, cipher string, certState *CertState, i
|
|||
initiator: initiator,
|
||||
window: b,
|
||||
myCert: certState.Certificate,
|
||||
writeLock: newSyncMutex(mutexKey{Type: mutexKeyTypeConnectionStateWrite}),
|
||||
writeLock: newSyncMutex("connection-state-write"),
|
||||
}
|
||||
|
||||
return ci
|
||||
|
|
|
@ -148,7 +148,7 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D
|
|||
|
||||
return &Firewall{
|
||||
Conntrack: &FirewallConntrack{
|
||||
syncMutex: newSyncMutex(mutexKey{Type: mutexKeyTypeFirewallConntrack}),
|
||||
syncMutex: newSyncMutex("firewall-conntrack"),
|
||||
Conns: make(map[firewall.Packet]*conn),
|
||||
TimerWheel: NewTimerWheel[firewall.Packet](min, max),
|
||||
},
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,6 +10,7 @@ require (
|
|||
github.com/flynn/noise v1.0.0
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/heimdalr/dag v1.4.0
|
||||
github.com/kardianos/service v1.2.2
|
||||
github.com/miekg/dns v1.1.56
|
||||
github.com/nbrownus/go-metrics-prometheus v0.0.0-20210712211119-974a6260965f
|
||||
|
@ -43,7 +44,6 @@ require (
|
|||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/btree v1.0.1 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/heimdalr/dag v1.4.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
|
||||
|
|
1
go.sum
1
go.sum
|
@ -33,6 +33,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
|
|||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
|
||||
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
|
|
|
@ -127,7 +127,7 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via *ViaSender, packet []by
|
|||
}
|
||||
|
||||
hostinfo := &HostInfo{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeHostInfo, ID: uint32(vpnIp)}),
|
||||
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||
ConnectionState: ci,
|
||||
localIndexId: myIndex,
|
||||
remoteIndexId: hs.Details.InitiatorIndex,
|
||||
|
@ -135,7 +135,7 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via *ViaSender, packet []by
|
|||
HandshakePacket: make(map[uint8][]byte, 0),
|
||||
lastHandshakeTime: hs.Details.Time,
|
||||
relayState: RelayState{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeRelayState, ID: uint32(vpnIp)}),
|
||||
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||
relays: map[iputil.VpnIp]struct{}{},
|
||||
relayForByIp: map[iputil.VpnIp]*Relay{},
|
||||
relayForByIdx: map[uint32]*Relay{},
|
||||
|
|
|
@ -102,7 +102,7 @@ func (hh *HandshakeHostInfo) cachePacket(l *logrus.Logger, t header.MessageType,
|
|||
|
||||
func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager {
|
||||
return &HandshakeManager{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeHandshakeManager}),
|
||||
syncRWMutex: newSyncRWMutex("handshake-manager"),
|
||||
vpnIps: map[iputil.VpnIp]*HandshakeHostInfo{},
|
||||
indexes: map[uint32]*HandshakeHostInfo{},
|
||||
mainHostMap: mainHostMap,
|
||||
|
@ -385,11 +385,11 @@ func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han
|
|||
}
|
||||
|
||||
hostinfo := &HostInfo{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeHostInfo, ID: uint32(vpnIp)}),
|
||||
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||
vpnIp: vpnIp,
|
||||
HandshakePacket: make(map[uint8][]byte, 0),
|
||||
relayState: RelayState{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeRelayState, ID: uint32(vpnIp)}),
|
||||
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||
relays: map[iputil.VpnIp]struct{}{},
|
||||
relayForByIp: map[iputil.VpnIp]*Relay{},
|
||||
relayForByIdx: map[uint32]*Relay{},
|
||||
|
@ -397,7 +397,7 @@ func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han
|
|||
}
|
||||
|
||||
hh := &HandshakeHostInfo{
|
||||
syncMutex: newSyncMutex(mutexKey{Type: mutexKeyTypeHandshakeHostInfo, ID: uint32(vpnIp)}),
|
||||
syncMutex: newSyncMutex("handshake-hostinfo"),
|
||||
hostinfo: hostinfo,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
|
|
@ -260,7 +260,7 @@ func NewHostMap(l *logrus.Logger, vpnCIDR *net.IPNet, preferredRanges []*net.IPN
|
|||
r := map[uint32]*HostInfo{}
|
||||
relays := map[uint32]*HostInfo{}
|
||||
m := HostMap{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeHostMap}),
|
||||
syncRWMutex: newSyncRWMutex("hostmap"),
|
||||
Indexes: i,
|
||||
Relays: relays,
|
||||
RemoteIndexes: r,
|
||||
|
|
|
@ -100,7 +100,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
|
|||
|
||||
ones, _ := myVpnNet.Mask.Size()
|
||||
h := LightHouse{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeLightHouse}),
|
||||
syncRWMutex: newSyncRWMutex("lighthouse"),
|
||||
ctx: ctx,
|
||||
amLighthouse: amLighthouse,
|
||||
myVpnIp: iputil.Ip2VpnIp(myVpnNet.IP),
|
||||
|
|
64
mutex.go
64
mutex.go
|
@ -1,64 +0,0 @@
|
|||
package nebula
|
||||
|
||||
import "fmt"
|
||||
|
||||
type mutexKeyType string
|
||||
|
||||
const (
|
||||
mutexKeyTypeHostMap mutexKeyType = "hostmap"
|
||||
|
||||
mutexKeyTypeLightHouse = "lighthouse"
|
||||
mutexKeyTypeRemoteList = "remote-list"
|
||||
mutexKeyTypeFirewallConntrack = "firewall-conntrack"
|
||||
mutexKeyTypeHostInfo = "hostinfo"
|
||||
mutexKeyTypeRelayState = "relay-state"
|
||||
mutexKeyTypeHandshakeHostInfo = "handshake-hostinfo"
|
||||
mutexKeyTypeHandshakeManager = "handshake-manager"
|
||||
mutexKeyTypeConnectionStateWrite = "connection-state-write-lock"
|
||||
|
||||
mutexKeyTypeConnectionManagerIn = "connection-manager-in-lock"
|
||||
mutexKeyTypeConnectionManagerOut = "connection-manager-out-lock"
|
||||
mutexKeyTypeConnectionManagerRelayUsed = "connection-manager-relay-used-lock"
|
||||
)
|
||||
|
||||
// For each Key in this map, the Value is a list of lock types you can already have
|
||||
// when you want to grab that Key. This ensures that locks are always fetched
|
||||
// in the same order, to prevent deadlocks.
|
||||
var allowedConcurrentLocks = map[mutexKeyType][]mutexKeyType{
|
||||
mutexKeyTypeHostMap: {mutexKeyTypeHandshakeHostInfo},
|
||||
mutexKeyTypeFirewallConntrack: {mutexKeyTypeHandshakeHostInfo},
|
||||
|
||||
mutexKeyTypeHandshakeManager: {mutexKeyTypeHostMap},
|
||||
mutexKeyTypeConnectionStateWrite: {mutexKeyTypeHostMap},
|
||||
|
||||
mutexKeyTypeLightHouse: {mutexKeyTypeHandshakeManager},
|
||||
mutexKeyTypeRemoteList: {mutexKeyTypeLightHouse},
|
||||
|
||||
mutexKeyTypeConnectionManagerIn: {mutexKeyTypeHostMap},
|
||||
mutexKeyTypeConnectionManagerOut: {mutexKeyTypeConnectionStateWrite, mutexKeyTypeConnectionManagerIn},
|
||||
mutexKeyTypeConnectionManagerRelayUsed: {mutexKeyTypeHandshakeHostInfo},
|
||||
|
||||
mutexKeyTypeRelayState: {mutexKeyTypeHostMap, mutexKeyTypeConnectionManagerRelayUsed},
|
||||
}
|
||||
|
||||
type mutexKey struct {
|
||||
Type mutexKeyType
|
||||
ID uint32
|
||||
}
|
||||
|
||||
type mutexValue struct {
|
||||
file string
|
||||
line int
|
||||
}
|
||||
|
||||
func (m mutexKey) String() string {
|
||||
if m.ID == 0 {
|
||||
return fmt.Sprintf("%s", m.Type)
|
||||
} else {
|
||||
return fmt.Sprintf("%s(%d)", m.Type, m.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func (m mutexValue) String() string {
|
||||
return fmt.Sprintf("%s:%d", m.file, m.line)
|
||||
}
|
|
@ -12,35 +12,66 @@ import (
|
|||
"github.com/timandy/routine"
|
||||
)
|
||||
|
||||
type mutexKey = string
|
||||
|
||||
// For each Key in this map, the Value is a list of lock types you can already have
|
||||
// when you want to grab that Key. This ensures that locks are always fetched
|
||||
// in the same order, to prevent deadlocks.
|
||||
var allowedConcurrentLocks = map[mutexKey][]mutexKey{
|
||||
"connection-manager-in": {"hostmap"},
|
||||
"connection-manager-out": {"connection-state-write", "connection-manager-in"},
|
||||
"connection-manager-relay-used": {"handshake-hostinfo"},
|
||||
"connection-state-write": {"hostmap"},
|
||||
"firewall-conntrack": {"handshake-hostinfo"},
|
||||
"handshake-manager": {"hostmap"},
|
||||
"hostmap": {"handshake-hostinfo"},
|
||||
"lighthouse": {"handshake-manager"},
|
||||
"relay-state": {"hostmap", "connection-manager-relay-used"},
|
||||
"remote-list": {"lighthouse"},
|
||||
}
|
||||
|
||||
type mutexValue struct {
|
||||
file string
|
||||
line int
|
||||
}
|
||||
|
||||
func (m mutexValue) String() string {
|
||||
return fmt.Sprintf("%s:%d", m.file, m.line)
|
||||
}
|
||||
|
||||
var threadLocal routine.ThreadLocal = routine.NewThreadLocalWithInitial(func() any { return map[mutexKey]mutexValue{} })
|
||||
|
||||
var allowedDAG *dag.DAG
|
||||
|
||||
// We build a directed acyclic graph to assert that the locks can only be
|
||||
// acquired in a determined order, If there are cycles in the DAG, then we
|
||||
// know that the locking order is not guaranteed.
|
||||
func init() {
|
||||
allowedDAG = dag.NewDAG()
|
||||
for k, v := range allowedConcurrentLocks {
|
||||
allowedDAG.AddVertexByID(string(k), k)
|
||||
_ = allowedDAG.AddVertexByID(k, k)
|
||||
for _, t := range v {
|
||||
if _, err := allowedDAG.GetVertex(string(t)); err != nil {
|
||||
allowedDAG.AddVertexByID(string(t), t)
|
||||
}
|
||||
_ = allowedDAG.AddVertexByID(t, t)
|
||||
}
|
||||
}
|
||||
for k, v := range allowedConcurrentLocks {
|
||||
for _, t := range v {
|
||||
allowedDAG.AddEdge(string(t), string(k))
|
||||
if err := allowedDAG.AddEdge(t, k); err != nil {
|
||||
panic(fmt.Errorf("Failed to assembled DAG for allowedConcurrentLocks: %w", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild allowedConcurrentLocks as a flattened list of all possibilities
|
||||
for k := range allowedConcurrentLocks {
|
||||
anc, err := allowedDAG.GetAncestors(string(k))
|
||||
anc, err := allowedDAG.GetAncestors(k)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var allowed []mutexKeyType
|
||||
var allowed []mutexKey
|
||||
for t := range anc {
|
||||
allowed = append(allowed, mutexKeyType(t))
|
||||
allowed = append(allowed, mutexKey(t))
|
||||
}
|
||||
allowedConcurrentLocks[k] = allowed
|
||||
}
|
||||
|
@ -76,7 +107,7 @@ func alertMutex(err error) {
|
|||
}
|
||||
|
||||
func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
|
||||
allowedConcurrent := allowedConcurrentLocks[add.Type]
|
||||
allowedConcurrent := allowedConcurrentLocks[add]
|
||||
|
||||
for k, v := range state {
|
||||
if add == k {
|
||||
|
@ -86,13 +117,13 @@ func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
|
|||
// TODO use slices.Contains, but requires go1.21
|
||||
var found bool
|
||||
for _, a := range allowedConcurrent {
|
||||
if a == k.Type {
|
||||
if a == k {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add.Type, state))
|
||||
alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
type mutexKey = string
|
||||
type syncRWMutex = sync.RWMutex
|
||||
type syncMutex = sync.Mutex
|
||||
|
||||
|
|
|
@ -216,7 +216,7 @@ type RemoteList struct {
|
|||
// NewRemoteList creates a new empty RemoteList
|
||||
func NewRemoteList(shouldAdd func(netip.Addr) bool) *RemoteList {
|
||||
return &RemoteList{
|
||||
syncRWMutex: newSyncRWMutex(mutexKey{Type: mutexKeyTypeRemoteList}),
|
||||
syncRWMutex: newSyncRWMutex("remote-list"),
|
||||
addrs: make([]*udp.Addr, 0),
|
||||
relays: make([]*iputil.VpnIp, 0),
|
||||
cache: make(map[iputil.VpnIp]*cache),
|
||||
|
|
Loading…
Reference in New Issue