diff --git a/control.go b/control.go index 07b42f2..4af115c 100644 --- a/control.go +++ b/control.go @@ -27,12 +27,13 @@ type controlHostLister interface { } type Control struct { - f *Interface - l *logrus.Logger - cancel context.CancelFunc - sshStart func() - statsStart func() - dnsStart func() + f *Interface + l *logrus.Logger + cancel context.CancelFunc + sshStart func() + statsStart func() + dnsStart func() + lighthouseStart func() } type ControlHostInfo struct { @@ -63,12 +64,15 @@ func (c *Control) Start() { if c.dnsStart != nil { go c.dnsStart() } + if c.lighthouseStart != nil { + c.lighthouseStart() + } // Start reading packets. c.f.run() } -// Stop signals nebula to shutdown, returns after the shutdown is complete +// Stop signals nebula to shutdown and close all tunnels, returns after the shutdown is complete func (c *Control) Stop() { // Stop the handshakeManager (and other services), to prevent new tunnels from // being created while we're shutting them all down. @@ -98,7 +102,7 @@ func (c *Control) RebindUDPServer() { _ = c.f.outside.Rebind() // Trigger a lighthouse update, useful for mobile clients that should have an update interval of 0 - c.f.lightHouse.SendUpdate(c.f) + c.f.lightHouse.SendUpdate() // Let the main interface know that we rebound so that underlying tunnels know to trigger punches from their remotes c.f.rebindCount++ diff --git a/lighthouse.go b/lighthouse.go index f281f9b..6c46663 100644 --- a/lighthouse.go +++ b/lighthouse.go @@ -64,11 +64,10 @@ type LightHouse struct { staticList atomic.Pointer[map[iputil.VpnIp]struct{}] lighthouses atomic.Pointer[map[iputil.VpnIp]struct{}] - interval atomic.Int64 - updateCancel context.CancelFunc - updateParentCtx context.Context - updateUdp EncWriter - nebulaPort uint32 // 32 bits because protobuf does not have a uint16 + interval atomic.Int64 + updateCancel context.CancelFunc + ifce EncWriter + nebulaPort uint32 // 32 bits because protobuf does not have a uint16 advertiseAddrs atomic.Pointer[[]netIpAndPort] @@ -217,7 +216,7 @@ func (lh *LightHouse) reload(c *config.C, initial bool) error { lh.updateCancel() } - lh.LhUpdateWorker(lh.updateParentCtx, lh.updateUdp) + lh.StartUpdateWorker() } } @@ -754,33 +753,33 @@ func NewUDPAddrFromLH6(ipp *Ip6AndPort) *udp.Addr { return udp.NewAddr(lhIp6ToIp(ipp), uint16(ipp.Port)) } -func (lh *LightHouse) LhUpdateWorker(ctx context.Context, f EncWriter) { - lh.updateParentCtx = ctx - lh.updateUdp = f - +func (lh *LightHouse) StartUpdateWorker() { interval := lh.GetUpdateInterval() if lh.amLighthouse || interval == 0 { return } clockSource := time.NewTicker(time.Second * time.Duration(interval)) - updateCtx, cancel := context.WithCancel(ctx) + updateCtx, cancel := context.WithCancel(lh.ctx) lh.updateCancel = cancel - defer clockSource.Stop() - for { - lh.SendUpdate(f) + go func() { + defer clockSource.Stop() - select { - case <-updateCtx.Done(): - return - case <-clockSource.C: - continue + for { + lh.SendUpdate() + + select { + case <-updateCtx.Done(): + return + case <-clockSource.C: + continue + } } - } + }() } -func (lh *LightHouse) SendUpdate(f EncWriter) { +func (lh *LightHouse) SendUpdate() { var v4 []*Ip4AndPort var v6 []*Ip6AndPort @@ -833,7 +832,7 @@ func (lh *LightHouse) SendUpdate(f EncWriter) { } for vpnIp := range lighthouses { - f.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out) + lh.ifce.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out) } } diff --git a/lighthouse_test.go b/lighthouse_test.go index 73632ac..66427e3 100644 --- a/lighthouse_test.go +++ b/lighthouse_test.go @@ -66,6 +66,35 @@ func Test_lhStaticMapping(t *testing.T) { assert.EqualError(t, err, "lighthouse 10.128.0.3 does not have a static_host_map entry") } +func TestReloadLighthouseInterval(t *testing.T) { + l := test.NewLogger() + _, myVpnNet, _ := net.ParseCIDR("10.128.0.1/16") + lh1 := "10.128.0.2" + + c := config.NewC(l) + c.Settings["lighthouse"] = map[interface{}]interface{}{ + "hosts": []interface{}{lh1}, + "interval": "1s", + } + + c.Settings["static_host_map"] = map[interface{}]interface{}{lh1: []interface{}{"1.1.1.1:4242"}} + lh, err := NewLightHouseFromConfig(context.Background(), l, c, myVpnNet, nil, nil) + assert.NoError(t, err) + lh.ifce = &mockEncWriter{} + + // The first one routine is kicked off by main.go currently, lets make sure that one dies + c.ReloadConfigString("lighthouse:\n interval: 5") + assert.Equal(t, int64(5), lh.interval.Load()) + + // Subsequent calls are killed off by the LightHouse.Reload function + c.ReloadConfigString("lighthouse:\n interval: 10") + assert.Equal(t, int64(10), lh.interval.Load()) + + // If this completes then nothing is stealing our reload routine + c.ReloadConfigString("lighthouse:\n interval: 11") + assert.Equal(t, int64(11), lh.interval.Load()) +} + func BenchmarkLighthouseHandleRequest(b *testing.B) { l := test.NewLogger() _, myVpnNet, _ := net.ParseCIDR("10.128.0.1/0") diff --git a/main.go b/main.go index 5845b76..ab2bd51 100644 --- a/main.go +++ b/main.go @@ -315,13 +315,12 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg // TODO: Better way to attach these, probably want a new interface in InterfaceConfig // I don't want to make this initial commit too far-reaching though ifce.writers = udpConns + lightHouse.ifce = ifce ifce.RegisterConfigChangeCallbacks(c) - ifce.reloadSendRecvError(c) go handshakeManager.Run(ctx, ifce) - go lightHouse.LhUpdateWorker(ctx, ifce) } // TODO - stats third-party modules start uncancellable goroutines. Update those libs to accept @@ -348,5 +347,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg dnsStart = dnsMain(l, hostMap, c) } - return &Control{ifce, l, cancel, sshStart, statsStart, dnsStart}, nil + return &Control{ + ifce, + l, + cancel, + sshStart, + statsStart, + dnsStart, + lightHouse.StartUpdateWorker, + }, nil }