mirror of https://github.com/slackhq/nebula.git
Send the lh update worker into its own routine instead of taking over the reload routine (#935)
This commit is contained in:
parent
959b015b3b
commit
14d0106716
20
control.go
20
control.go
|
@ -27,12 +27,13 @@ type controlHostLister interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Control struct {
|
type Control struct {
|
||||||
f *Interface
|
f *Interface
|
||||||
l *logrus.Logger
|
l *logrus.Logger
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
sshStart func()
|
sshStart func()
|
||||||
statsStart func()
|
statsStart func()
|
||||||
dnsStart func()
|
dnsStart func()
|
||||||
|
lighthouseStart func()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ControlHostInfo struct {
|
type ControlHostInfo struct {
|
||||||
|
@ -63,12 +64,15 @@ func (c *Control) Start() {
|
||||||
if c.dnsStart != nil {
|
if c.dnsStart != nil {
|
||||||
go c.dnsStart()
|
go c.dnsStart()
|
||||||
}
|
}
|
||||||
|
if c.lighthouseStart != nil {
|
||||||
|
c.lighthouseStart()
|
||||||
|
}
|
||||||
|
|
||||||
// Start reading packets.
|
// Start reading packets.
|
||||||
c.f.run()
|
c.f.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop signals nebula to shutdown, returns after the shutdown is complete
|
// Stop signals nebula to shutdown and close all tunnels, returns after the shutdown is complete
|
||||||
func (c *Control) Stop() {
|
func (c *Control) Stop() {
|
||||||
// Stop the handshakeManager (and other services), to prevent new tunnels from
|
// Stop the handshakeManager (and other services), to prevent new tunnels from
|
||||||
// being created while we're shutting them all down.
|
// being created while we're shutting them all down.
|
||||||
|
@ -98,7 +102,7 @@ func (c *Control) RebindUDPServer() {
|
||||||
_ = c.f.outside.Rebind()
|
_ = c.f.outside.Rebind()
|
||||||
|
|
||||||
// Trigger a lighthouse update, useful for mobile clients that should have an update interval of 0
|
// Trigger a lighthouse update, useful for mobile clients that should have an update interval of 0
|
||||||
c.f.lightHouse.SendUpdate(c.f)
|
c.f.lightHouse.SendUpdate()
|
||||||
|
|
||||||
// Let the main interface know that we rebound so that underlying tunnels know to trigger punches from their remotes
|
// Let the main interface know that we rebound so that underlying tunnels know to trigger punches from their remotes
|
||||||
c.f.rebindCount++
|
c.f.rebindCount++
|
||||||
|
|
|
@ -64,11 +64,10 @@ type LightHouse struct {
|
||||||
staticList atomic.Pointer[map[iputil.VpnIp]struct{}]
|
staticList atomic.Pointer[map[iputil.VpnIp]struct{}]
|
||||||
lighthouses atomic.Pointer[map[iputil.VpnIp]struct{}]
|
lighthouses atomic.Pointer[map[iputil.VpnIp]struct{}]
|
||||||
|
|
||||||
interval atomic.Int64
|
interval atomic.Int64
|
||||||
updateCancel context.CancelFunc
|
updateCancel context.CancelFunc
|
||||||
updateParentCtx context.Context
|
ifce EncWriter
|
||||||
updateUdp EncWriter
|
nebulaPort uint32 // 32 bits because protobuf does not have a uint16
|
||||||
nebulaPort uint32 // 32 bits because protobuf does not have a uint16
|
|
||||||
|
|
||||||
advertiseAddrs atomic.Pointer[[]netIpAndPort]
|
advertiseAddrs atomic.Pointer[[]netIpAndPort]
|
||||||
|
|
||||||
|
@ -217,7 +216,7 @@ func (lh *LightHouse) reload(c *config.C, initial bool) error {
|
||||||
lh.updateCancel()
|
lh.updateCancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
lh.LhUpdateWorker(lh.updateParentCtx, lh.updateUdp)
|
lh.StartUpdateWorker()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -754,33 +753,33 @@ func NewUDPAddrFromLH6(ipp *Ip6AndPort) *udp.Addr {
|
||||||
return udp.NewAddr(lhIp6ToIp(ipp), uint16(ipp.Port))
|
return udp.NewAddr(lhIp6ToIp(ipp), uint16(ipp.Port))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lh *LightHouse) LhUpdateWorker(ctx context.Context, f EncWriter) {
|
func (lh *LightHouse) StartUpdateWorker() {
|
||||||
lh.updateParentCtx = ctx
|
|
||||||
lh.updateUdp = f
|
|
||||||
|
|
||||||
interval := lh.GetUpdateInterval()
|
interval := lh.GetUpdateInterval()
|
||||||
if lh.amLighthouse || interval == 0 {
|
if lh.amLighthouse || interval == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
clockSource := time.NewTicker(time.Second * time.Duration(interval))
|
clockSource := time.NewTicker(time.Second * time.Duration(interval))
|
||||||
updateCtx, cancel := context.WithCancel(ctx)
|
updateCtx, cancel := context.WithCancel(lh.ctx)
|
||||||
lh.updateCancel = cancel
|
lh.updateCancel = cancel
|
||||||
defer clockSource.Stop()
|
|
||||||
|
|
||||||
for {
|
go func() {
|
||||||
lh.SendUpdate(f)
|
defer clockSource.Stop()
|
||||||
|
|
||||||
select {
|
for {
|
||||||
case <-updateCtx.Done():
|
lh.SendUpdate()
|
||||||
return
|
|
||||||
case <-clockSource.C:
|
select {
|
||||||
continue
|
case <-updateCtx.Done():
|
||||||
|
return
|
||||||
|
case <-clockSource.C:
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lh *LightHouse) SendUpdate(f EncWriter) {
|
func (lh *LightHouse) SendUpdate() {
|
||||||
var v4 []*Ip4AndPort
|
var v4 []*Ip4AndPort
|
||||||
var v6 []*Ip6AndPort
|
var v6 []*Ip6AndPort
|
||||||
|
|
||||||
|
@ -833,7 +832,7 @@ func (lh *LightHouse) SendUpdate(f EncWriter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for vpnIp := range lighthouses {
|
for vpnIp := range lighthouses {
|
||||||
f.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out)
|
lh.ifce.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,35 @@ func Test_lhStaticMapping(t *testing.T) {
|
||||||
assert.EqualError(t, err, "lighthouse 10.128.0.3 does not have a static_host_map entry")
|
assert.EqualError(t, err, "lighthouse 10.128.0.3 does not have a static_host_map entry")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReloadLighthouseInterval(t *testing.T) {
|
||||||
|
l := test.NewLogger()
|
||||||
|
_, myVpnNet, _ := net.ParseCIDR("10.128.0.1/16")
|
||||||
|
lh1 := "10.128.0.2"
|
||||||
|
|
||||||
|
c := config.NewC(l)
|
||||||
|
c.Settings["lighthouse"] = map[interface{}]interface{}{
|
||||||
|
"hosts": []interface{}{lh1},
|
||||||
|
"interval": "1s",
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Settings["static_host_map"] = map[interface{}]interface{}{lh1: []interface{}{"1.1.1.1:4242"}}
|
||||||
|
lh, err := NewLightHouseFromConfig(context.Background(), l, c, myVpnNet, nil, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
lh.ifce = &mockEncWriter{}
|
||||||
|
|
||||||
|
// The first one routine is kicked off by main.go currently, lets make sure that one dies
|
||||||
|
c.ReloadConfigString("lighthouse:\n interval: 5")
|
||||||
|
assert.Equal(t, int64(5), lh.interval.Load())
|
||||||
|
|
||||||
|
// Subsequent calls are killed off by the LightHouse.Reload function
|
||||||
|
c.ReloadConfigString("lighthouse:\n interval: 10")
|
||||||
|
assert.Equal(t, int64(10), lh.interval.Load())
|
||||||
|
|
||||||
|
// If this completes then nothing is stealing our reload routine
|
||||||
|
c.ReloadConfigString("lighthouse:\n interval: 11")
|
||||||
|
assert.Equal(t, int64(11), lh.interval.Load())
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkLighthouseHandleRequest(b *testing.B) {
|
func BenchmarkLighthouseHandleRequest(b *testing.B) {
|
||||||
l := test.NewLogger()
|
l := test.NewLogger()
|
||||||
_, myVpnNet, _ := net.ParseCIDR("10.128.0.1/0")
|
_, myVpnNet, _ := net.ParseCIDR("10.128.0.1/0")
|
||||||
|
|
13
main.go
13
main.go
|
@ -315,13 +315,12 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
|
||||||
// TODO: Better way to attach these, probably want a new interface in InterfaceConfig
|
// TODO: Better way to attach these, probably want a new interface in InterfaceConfig
|
||||||
// I don't want to make this initial commit too far-reaching though
|
// I don't want to make this initial commit too far-reaching though
|
||||||
ifce.writers = udpConns
|
ifce.writers = udpConns
|
||||||
|
lightHouse.ifce = ifce
|
||||||
|
|
||||||
ifce.RegisterConfigChangeCallbacks(c)
|
ifce.RegisterConfigChangeCallbacks(c)
|
||||||
|
|
||||||
ifce.reloadSendRecvError(c)
|
ifce.reloadSendRecvError(c)
|
||||||
|
|
||||||
go handshakeManager.Run(ctx, ifce)
|
go handshakeManager.Run(ctx, ifce)
|
||||||
go lightHouse.LhUpdateWorker(ctx, ifce)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - stats third-party modules start uncancellable goroutines. Update those libs to accept
|
// TODO - stats third-party modules start uncancellable goroutines. Update those libs to accept
|
||||||
|
@ -348,5 +347,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
|
||||||
dnsStart = dnsMain(l, hostMap, c)
|
dnsStart = dnsMain(l, hostMap, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Control{ifce, l, cancel, sshStart, statsStart, dnsStart}, nil
|
return &Control{
|
||||||
|
ifce,
|
||||||
|
l,
|
||||||
|
cancel,
|
||||||
|
sshStart,
|
||||||
|
statsStart,
|
||||||
|
dnsStart,
|
||||||
|
lightHouse.StartUpdateWorker,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue