diff --git a/connection_manager.go b/connection_manager.go index be2ac59..e76cd95 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -247,7 +247,7 @@ func (n *connectionManager) HandleDeletionTick(now time.Time) { break } - hostinfo, err := n.hostMap.QueryIndex(localIndex) + hostinfo, mainHostInfo, err := n.hostMap.QueryIndexIsPrimary(localIndex) if err != nil { n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap") n.ClearLocalIndex(localIndex) @@ -269,6 +269,12 @@ func (n *connectionManager) HandleDeletionTick(now time.Time) { n.ClearLocalIndex(localIndex) n.ClearPendingDeletion(localIndex) + + if !mainHostInfo { + // This hostinfo is still being used despite not being the primary hostinfo for this vpn ip + // Keep tracking so that we can tear it down when it goes away + n.Out(localIndex) + } continue } diff --git a/control.go b/control.go index 9858646..203278d 100644 --- a/control.go +++ b/control.go @@ -198,7 +198,7 @@ func (c *Control) CloseAllTunnels(excludeLighthouses bool) (closed int) { hostInfos := []*HostInfo{} // Grab the hostMap lock to access the Hosts map c.f.hostMap.Lock() - for _, relayHost := range c.f.hostMap.Hosts { + for _, relayHost := range c.f.hostMap.Indexes { if _, ok := relayingHosts[relayHost.vpnIp]; !ok { hostInfos = append(hostInfos, relayHost) } diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go index d12412e..8e33deb 100644 --- a/e2e/handshakes_test.go +++ b/e2e/handshakes_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/slackhq/nebula" "github.com/slackhq/nebula/e2e/router" "github.com/slackhq/nebula/header" @@ -393,43 +394,19 @@ func TestStage1RaceRelays(t *testing.T) { relayControl.Start() theirControl.Start() - r.Log("Trigger a handshake to start on both me and relay") - myControl.InjectTunUDPPacket(relayVpnIpNet.IP, 80, 80, []byte("Hi from me")) - relayControl.InjectTunUDPPacket(myVpnIpNet.IP, 80, 80, []byte("Hi from relay")) - - r.Log("Get both stage 1 handshake packets") - //TODO: this is where it breaks, we need to get the hs packets for the relay not for the destination - myHsForThem := myControl.GetFromUDP(true) - relayHsForMe := relayControl.GetFromUDP(true) - - r.Log("Now inject both stage 1 handshake packets") - r.InjectUDPPacket(relayControl, myControl, relayHsForMe) - r.InjectUDPPacket(myControl, relayControl, myHsForThem) - - r.Log("Route for me until I send a message packet to relay") - r.RouteForAllUntilAfterMsgTypeTo(relayControl, header.Message, header.MessageNone) - - r.Log("My cached packet should be received by relay") - myCachedPacket := relayControl.GetFromTun(true) - assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIpNet.IP, relayVpnIpNet.IP, 80, 80) - - r.Log("Relays cached packet should be received by me") - relayCachedPacket := r.RouteForAllUntilTxTun(myControl) - assertUdpPacket(t, []byte("Hi from relay"), relayCachedPacket, relayVpnIpNet.IP, myVpnIpNet.IP, 80, 80) - - r.Log("Do a bidirectional tunnel test; me and relay") + r.Log("Get a tunnel between me and relay") assertTunnel(t, myVpnIpNet.IP, relayVpnIpNet.IP, myControl, relayControl, r) - r.Log("Create a tunnel between relay and them") + r.Log("Get a tunnel between them and relay") assertTunnel(t, theirVpnIpNet.IP, relayVpnIpNet.IP, theirControl, relayControl, r) - r.RenderHostmaps("Starting hostmaps", myControl, relayControl, theirControl) + r.Log("Trigger a handshake from both them and me via relay to them and me") + myControl.InjectTunUDPPacket(theirVpnIpNet.IP, 80, 80, []byte("Hi from me")) + theirControl.InjectTunUDPPacket(myVpnIpNet.IP, 80, 80, []byte("Hi from them")) - r.Log("Trigger a handshake to start from me to them via the relay") - //TODO: if we initiate a handshake from me and then assert the tunnel it will cause a relay control race that can blow up - // this is a problem that exists on master today - //myControl.InjectTunUDPPacket(theirVpnIpNet.IP, 80, 80, []byte("Hi from me")) - assertTunnel(t, myVpnIpNet.IP, theirVpnIpNet.IP, myControl, theirControl, r) + r.Log("Wait for a packet from them to me") + p := r.RouteForAllUntilTxTun(myControl) + _ = p myControl.Stop() theirControl.Stop() @@ -438,4 +415,93 @@ func TestStage1RaceRelays(t *testing.T) { ////TODO: assert hostmaps } +func TestStage1RaceRelays2(t *testing.T) { + //NOTE: this is a race between me and relay resulting in a full tunnel from me to them via relay + ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{}) + myControl, myVpnIpNet, myUdpAddr := newSimpleServer(ca, caKey, "me ", net.IP{10, 0, 0, 1}, m{"relay": m{"use_relays": true}}) + relayControl, relayVpnIpNet, relayUdpAddr := newSimpleServer(ca, caKey, "relay ", net.IP{10, 0, 0, 128}, m{"relay": m{"am_relay": true}}) + theirControl, theirVpnIpNet, theirUdpAddr := newSimpleServer(ca, caKey, "them ", net.IP{10, 0, 0, 2}, m{"relay": m{"use_relays": true}}) + l := NewTestLogger() + + // Teach my how to get to the relay and that their can be reached via the relay + myControl.InjectLightHouseAddr(relayVpnIpNet.IP, relayUdpAddr) + theirControl.InjectLightHouseAddr(relayVpnIpNet.IP, relayUdpAddr) + + myControl.InjectRelays(theirVpnIpNet.IP, []net.IP{relayVpnIpNet.IP}) + theirControl.InjectRelays(myVpnIpNet.IP, []net.IP{relayVpnIpNet.IP}) + + relayControl.InjectLightHouseAddr(theirVpnIpNet.IP, theirUdpAddr) + relayControl.InjectLightHouseAddr(myVpnIpNet.IP, myUdpAddr) + + // Build a router so we don't have to reason who gets which packet + r := router.NewR(t, myControl, relayControl, theirControl) + defer r.RenderFlow() + + // Start the servers + myControl.Start() + relayControl.Start() + theirControl.Start() + + r.Log("Get a tunnel between me and relay") + l.Info("Get a tunnel between me and relay") + assertTunnel(t, myVpnIpNet.IP, relayVpnIpNet.IP, myControl, relayControl, r) + + r.Log("Get a tunnel between them and relay") + l.Info("Get a tunnel between them and relay") + assertTunnel(t, theirVpnIpNet.IP, relayVpnIpNet.IP, theirControl, relayControl, r) + + r.Log("Trigger a handshake from both them and me via relay to them and me") + l.Info("Trigger a handshake from both them and me via relay to them and me") + myControl.InjectTunUDPPacket(theirVpnIpNet.IP, 80, 80, []byte("Hi from me")) + theirControl.InjectTunUDPPacket(myVpnIpNet.IP, 80, 80, []byte("Hi from them")) + + //r.RouteUntilAfterMsgType(myControl, header.Control, header.MessageNone) + //r.RouteUntilAfterMsgType(theirControl, header.Control, header.MessageNone) + + r.Log("Wait for a packet from them to me") + l.Info("Wait for a packet from them to me; myControl") + r.RouteForAllUntilTxTun(myControl) + l.Info("Wait for a packet from them to me; theirControl") + r.RouteForAllUntilTxTun(theirControl) + + r.Log("Assert the tunnel works") + l.Info("Assert the tunnel works") + assertTunnel(t, theirVpnIpNet.IP, myVpnIpNet.IP, theirControl, myControl, r) + + t.Log("Wait until we remove extra tunnels") + l.Info("Wait until we remove extra tunnels") + l.WithFields( + logrus.Fields{ + "myControl": len(myControl.GetHostmap().Indexes), + "theirControl": len(theirControl.GetHostmap().Indexes), + "relayControl": len(relayControl.GetHostmap().Indexes), + }).Info("Waiting for hostinfos to be removed...") + hostInfos := len(myControl.GetHostmap().Indexes) + len(theirControl.GetHostmap().Indexes) + len(relayControl.GetHostmap().Indexes) + retries := 60 + for hostInfos > 6 && retries > 0 { + hostInfos = len(myControl.GetHostmap().Indexes) + len(theirControl.GetHostmap().Indexes) + len(relayControl.GetHostmap().Indexes) + l.WithFields( + logrus.Fields{ + "myControl": len(myControl.GetHostmap().Indexes), + "theirControl": len(theirControl.GetHostmap().Indexes), + "relayControl": len(relayControl.GetHostmap().Indexes), + }).Info("Waiting for hostinfos to be removed...") + assertTunnel(t, myVpnIpNet.IP, theirVpnIpNet.IP, myControl, theirControl, r) + t.Log("Connection manager hasn't ticked yet") + time.Sleep(time.Second) + retries-- + } + + r.Log("Assert the tunnel works") + l.Info("Assert the tunnel works") + assertTunnel(t, theirVpnIpNet.IP, myVpnIpNet.IP, theirControl, myControl, r) + + myControl.Stop() + theirControl.Stop() + relayControl.Stop() + + // + ////TODO: assert hostmaps +} + //TODO: add a test with many lies diff --git a/e2e/helpers_test.go b/e2e/helpers_test.go index 3a2d7b5..ff1347f 100644 --- a/e2e/helpers_test.go +++ b/e2e/helpers_test.go @@ -77,6 +77,10 @@ func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, u "timestamp_format": fmt.Sprintf("%v 15:04:05.000000", name), "level": l.Level.String(), }, + "timers": m{ + "pending_deletion_interval": 4, + "connection_alive_interval": 4, + }, } if overrides != nil { diff --git a/e2e/router/hostmap.go b/e2e/router/hostmap.go index 10627fc..120be69 100644 --- a/e2e/router/hostmap.go +++ b/e2e/router/hostmap.go @@ -63,10 +63,13 @@ func renderHostmap(c *nebula.Control) (string, []*edge) { r := fmt.Sprintf("\tsubgraph %s[\"%s (%s)\"]\n", clusterName, clusterName, clusterVpnIp) hm := c.GetHostmap() + hm.RLock() + defer hm.RUnlock() // Draw the vpn to index nodes r += fmt.Sprintf("\t\tsubgraph %s.hosts[\"Hosts (vpn ip to index)\"]\n", clusterName) - for _, vpnIp := range sortedHosts(hm.Hosts) { + hosts := sortedHosts(hm.Hosts) + for _, vpnIp := range hosts { hi := hm.Hosts[vpnIp] r += fmt.Sprintf("\t\t\t%v.%v[\"%v\"]\n", clusterName, vpnIp, vpnIp) lines = append(lines, fmt.Sprintf("%v.%v --> %v.%v", clusterName, vpnIp, clusterName, hi.GetLocalIndex())) @@ -94,12 +97,15 @@ func renderHostmap(c *nebula.Control) (string, []*edge) { // Draw the local index to relay or remote index nodes r += fmt.Sprintf("\t\tsubgraph indexes.%s[\"Indexes (index to hostinfo)\"]\n", clusterName) - for _, idx := range sortedIndexes(hm.Indexes) { - hi := hm.Indexes[idx] - r += fmt.Sprintf("\t\t\t%v.%v[\"%v (%v)\"]\n", clusterName, idx, idx, hi.GetVpnIp()) - remoteClusterName := strings.Trim(hi.GetCert().Details.Name, " ") - globalLines = append(globalLines, &edge{from: fmt.Sprintf("%v.%v", clusterName, idx), to: fmt.Sprintf("%v.%v", remoteClusterName, hi.GetRemoteIndex())}) - _ = hi + indexes := sortedIndexes(hm.Indexes) + for _, idx := range indexes { + hi, ok := hm.Indexes[idx] + if ok { + r += fmt.Sprintf("\t\t\t%v.%v[\"%v (%v)\"]\n", clusterName, idx, idx, hi.GetVpnIp()) + remoteClusterName := strings.Trim(hi.GetCert().Details.Name, " ") + globalLines = append(globalLines, &edge{from: fmt.Sprintf("%v.%v", clusterName, idx), to: fmt.Sprintf("%v.%v", remoteClusterName, hi.GetRemoteIndex())}) + _ = hi + } } r += "\t\tend\n" diff --git a/handshake_manager.go b/handshake_manager.go index 6a5a6e4..a324852 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -238,6 +238,12 @@ func (c *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, f udp.EncWriter, l Error("Failed to marshal Control message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, *relay, msg, make([]byte, 12), make([]byte, mtu)) + c.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(c.lightHouse.myVpnIp), + "relayTarget": iputil.VpnIp(vpnIp), + "initiatorIdx": existingRelay.LocalIndex, + "vpnIp": *relay}). + Info("send CreateRelayRequest") } default: hostinfo.logger(c.l). @@ -267,6 +273,12 @@ func (c *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, f udp.EncWriter, l Error("Failed to marshal Control message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, *relay, msg, make([]byte, 12), make([]byte, mtu)) + c.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(c.lightHouse.myVpnIp), + "relayTarget": iputil.VpnIp(vpnIp), + "initiatorIdx": idx, + "vpnIp": *relay}). + Info("send CreateRelayRequest") } } } @@ -333,7 +345,7 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket } // Is this a newer handshake? - if existingHostInfo.lastHandshakeTime >= hostinfo.lastHandshakeTime { + if existingHostInfo.lastHandshakeTime >= hostinfo.lastHandshakeTime && !existingHostInfo.ConnectionState.initiator { return existingHostInfo, ErrExistingHostInfo } diff --git a/hostmap.go b/hostmap.go index fc15556..a27a7f9 100644 --- a/hostmap.go +++ b/hostmap.go @@ -63,6 +63,9 @@ type HostMap struct { l *logrus.Logger } +// For synchronization, treat the pointed-to Relay struct as immutable. To edit the Relay +// struct, make a copy of an existing value, edit the fileds in the copy, and +// then store a pointer to the new copy in both realyForBy* maps. type RelayState struct { sync.RWMutex @@ -123,13 +126,43 @@ func (rs *RelayState) CopyRelayForIdxs() []uint32 { func (rs *RelayState) RemoveRelay(localIdx uint32) (iputil.VpnIp, bool) { rs.Lock() defer rs.Unlock() - relay, ok := rs.relayForByIdx[localIdx] + r, ok := rs.relayForByIdx[localIdx] if !ok { return iputil.VpnIp(0), false } delete(rs.relayForByIdx, localIdx) - delete(rs.relayForByIp, relay.PeerIp) - return relay.PeerIp, true + delete(rs.relayForByIp, r.PeerIp) + return r.PeerIp, true +} + +func (rs *RelayState) CompleteRelayByIP(vpnIp iputil.VpnIp, remoteIdx uint32) bool { + rs.Lock() + defer rs.Unlock() + r, ok := rs.relayForByIp[vpnIp] + if !ok { + return false + } + newRelay := *r + newRelay.State = Established + newRelay.RemoteIndex = remoteIdx + rs.relayForByIdx[r.LocalIndex] = &newRelay + rs.relayForByIp[r.PeerIp] = &newRelay + return true +} + +func (rs *RelayState) CompleteRelayByIdx(localIdx uint32, remoteIdx uint32) (*Relay, bool) { + rs.Lock() + defer rs.Unlock() + r, ok := rs.relayForByIdx[localIdx] + if !ok { + return nil, false + } + newRelay := *r + newRelay.State = Established + newRelay.RemoteIndex = remoteIdx + rs.relayForByIdx[r.LocalIndex] = &newRelay + rs.relayForByIp[r.PeerIp] = &newRelay + return &newRelay, true } func (rs *RelayState) QueryRelayForByIp(vpnIp iputil.VpnIp) (*Relay, bool) { @@ -145,6 +178,7 @@ func (rs *RelayState) QueryRelayForByIdx(idx uint32) (*Relay, bool) { r, ok := rs.relayForByIdx[idx] return r, ok } + func (rs *RelayState) InsertRelay(ip iputil.VpnIp, idx uint32, r *Relay) { rs.Lock() defer rs.Unlock() @@ -362,25 +396,27 @@ func (hm *HostMap) DeleteHostInfo(hostinfo *HostInfo) bool { hm.unlockedDeleteHostInfo(hostinfo) hm.Unlock() - // And tear down all the relays going through this host + // And tear down all the relays going through this host, if final for _, localIdx := range hostinfo.relayState.CopyRelayForIdxs() { hm.RemoveRelay(localIdx) } - // And tear down the relays this deleted hostInfo was using to be reached - teardownRelayIdx := []uint32{} - for _, relayIp := range hostinfo.relayState.CopyRelayIps() { - relayHostInfo, err := hm.QueryVpnIp(relayIp) - if err != nil { - hm.l.WithError(err).WithField("relay", relayIp).Info("Missing relay host in hostmap") - } else { - if r, ok := relayHostInfo.relayState.QueryRelayForByIp(hostinfo.vpnIp); ok { - teardownRelayIdx = append(teardownRelayIdx, r.LocalIndex) + if final { + // And tear down the relays this deleted hostInfo was using to be reached + teardownRelayIdx := []uint32{} + for _, relayIp := range hostinfo.relayState.CopyRelayIps() { + relayHostInfo, err := hm.QueryVpnIp(relayIp) + if err != nil { + hm.l.WithError(err).WithField("relay", relayIp).Info("Missing relay host in hostmap") + } else { + if r, ok := relayHostInfo.relayState.QueryRelayForByIp(hostinfo.vpnIp); ok { + teardownRelayIdx = append(teardownRelayIdx, r.LocalIndex) + } } } - } - for _, localIdx := range teardownRelayIdx { - hm.RemoveRelay(localIdx) + for _, localIdx := range teardownRelayIdx { + hm.RemoveRelay(localIdx) + } } return final @@ -486,6 +522,20 @@ func (hm *HostMap) QueryIndex(index uint32) (*HostInfo, error) { return nil, errors.New("unable to find index") } } + +// Retrieves a HostInfo by Index. Returns whether the HostInfo is primary at time of query. +// This helper exists so that the hostinfo.prev pointer can be read while the hostmap lock is held. +func (hm *HostMap) QueryIndexIsPrimary(index uint32) (*HostInfo, bool, error) { + //TODO: we probably just want to return bool instead of error, or at least a static error + hm.RLock() + if h, ok := hm.Indexes[index]; ok { + hm.RUnlock() + return h, h.prev == nil, nil + } else { + hm.RUnlock() + return nil, false, errors.New("unable to find index") + } +} func (hm *HostMap) QueryRelayIndex(index uint32) (*HostInfo, error) { //TODO: we probably just want to return bool instead of error, or at least a static error hm.RLock() diff --git a/hostmap_tester.go b/hostmap_tester.go index 1d4323f..0d5d41b 100644 --- a/hostmap_tester.go +++ b/hostmap_tester.go @@ -19,6 +19,6 @@ func (i *HostInfo) GetRemoteIndex() uint32 { return i.remoteIndexId } -func (i *HostInfo) GetRelayState() RelayState { - return i.relayState +func (i *HostInfo) GetRelayState() *RelayState { + return &i.relayState } diff --git a/outside.go b/outside.go index 8fa90be..ef547dc 100644 --- a/outside.go +++ b/outside.go @@ -89,12 +89,8 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by relay, ok := hostinfo.relayState.QueryRelayForByIdx(h.RemoteIndex) if !ok { // The only way this happens is if hostmap has an index to the correct HostInfo, but the HostInfo is missing - // its internal mapping. This shouldn't happen! - hostinfo.logger(f.l).WithField("hostinfo", hostinfo.vpnIp).WithField("remoteIndex", h.RemoteIndex).Errorf("HostInfo missing remote index") - // Delete my local index from the hostmap - f.hostMap.DeleteRelayIdx(h.RemoteIndex) - // When the peer doesn't receive any return traffic, its connection_manager will eventually clean up - // the broken relay when it cleans up the associated HostInfo object. + // its internal mapping. This should never happen. + hostinfo.logger(f.l).WithFields(logrus.Fields{"vpnIp": hostinfo.vpnIp, "remoteIndex": h.RemoteIndex}).Error("HostInfo missing remote relay index") return } @@ -114,7 +110,7 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by // find the target Relay info object targetRelay, ok := targetHI.relayState.QueryRelayForByIp(hostinfo.vpnIp) if !ok { - hostinfo.logger(f.l).WithField("peerIp", relay.PeerIp).Info("Failed to find relay in hostinfo") + hostinfo.logger(f.l).WithFields(logrus.Fields{"peerIp": relay.PeerIp, "vpnIp": hostinfo.vpnIp}).Info("Failed to find relay in hostinfo") return } @@ -130,7 +126,7 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by hostinfo.logger(f.l).Error("Unexpected Relay Type of Terminal") } } else { - hostinfo.logger(f.l).WithField("targetRelayState", targetRelay.State).Info("Unexpected target relay state") + hostinfo.logger(f.l).WithFields(logrus.Fields{"peerIp": relay.PeerIp, "vpnIp": hostinfo.vpnIp, "targetRelayState": targetRelay.State}).Info("Unexpected target relay state") return } } diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index 442a9b5..38c11a6 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -50,7 +50,7 @@ func newTunFromFd(_ *logrus.Logger, _ int, _ *net.IPNet, _ int, _ []Route, _ int // These are unencrypted ip layer frames destined for another nebula node. // packets should exit the udp side, capture them with udpConn.Get func (t *TestTun) Send(packet []byte) { - if t.l.Level >= logrus.InfoLevel { + if t.l.Level >= logrus.DebugLevel { t.l.WithField("dataLen", len(packet)).Debug("Tun receiving injected packet") } t.rxPackets <- packet diff --git a/relay_manager.go b/relay_manager.go index 080d144..3b4c904 100644 --- a/relay_manager.go +++ b/relay_manager.go @@ -88,17 +88,14 @@ func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iput // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) { - relay, ok := relayHostInfo.relayState.QueryRelayForByIdx(m.InitiatorRelayIndex) + relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex) if !ok { rm.l.WithFields(logrus.Fields{"relayHostInfo": relayHostInfo.vpnIp, "initiatorRelayIndex": m.InitiatorRelayIndex, "relayFrom": m.RelayFromIp, - "relayTo": m.RelayToIp}).Info("relayManager EstablishRelay relayForByIdx not found") + "relayTo": m.RelayToIp}).Info("relayManager failed to update relay") return nil, fmt.Errorf("unknown relay") } - // relay deserves some synchronization - relay.RemoteIndex = m.ResponderRelayIndex - relay.State = Established return relay, nil } @@ -120,7 +117,7 @@ func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m * "relayTarget": iputil.VpnIp(m.RelayToIp), "initiatorIdx": m.InitiatorRelayIndex, "responderIdx": m.ResponderRelayIndex, - "hostInfo": h.vpnIp}). + "vpnIp": h.vpnIp}). Info("handleCreateRelayResponse") target := iputil.VpnIp(m.RelayToIp) @@ -155,44 +152,63 @@ func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m * msg, err := resp.Marshal() if err != nil { rm.l. - WithError(err).Error("relayManager Failed to marhsal Control CreateRelayResponse message to create relay") + WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, peerHostInfo.vpnIp, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(resp.RelayFromIp), + "relayTarget": iputil.VpnIp(resp.RelayToIp), + "initiatorIdx": resp.InitiatorRelayIndex, + "responderIdx": resp.ResponderRelayIndex, + "vpnIp": peerHostInfo.vpnIp}). + Info("send CreateRelayResponse") } } func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) { - rm.l.WithFields(logrus.Fields{ - "relayFrom": iputil.VpnIp(m.RelayFromIp), - "relayTarget": iputil.VpnIp(m.RelayToIp), - "initiatorIdx": m.InitiatorRelayIndex, - "hostInfo": h.vpnIp}). - Info("handleCreateRelayRequest") + from := iputil.VpnIp(m.RelayFromIp) target := iputil.VpnIp(m.RelayToIp) + + logMsg := rm.l.WithFields(logrus.Fields{ + "relayFrom": from, + "relayTarget": target, + "initiatorIdx": m.InitiatorRelayIndex, + "vpnIp": h.vpnIp}) + + logMsg.Info("handleCreateRelayRequest") // Is the target of the relay me? if target == f.myVpnIp { existingRelay, ok := h.relayState.QueryRelayForByIp(from) - addRelay := !ok if ok { - // Clean up existing relay, if this is a new request. - if existingRelay.RemoteIndex != m.InitiatorRelayIndex { - // We got a brand new Relay request, because its index is different than what we saw before. - // Clean up the existing Relay state, and get ready to record new Relay state. - rm.hostmap.RemoveRelay(existingRelay.LocalIndex) - addRelay = true + switch existingRelay.State { + case Requested: + ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex) + if !ok { + logMsg.Error("Relay State not found") + return + } + case Established: + if existingRelay.RemoteIndex != m.InitiatorRelayIndex { + // We got a brand new Relay request, because its index is different than what we saw before. + // This should never happen. The peer should never change an index, once created. + logMsg.WithFields(logrus.Fields{ + "existingRemoteIdx": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") + return + } } - } - if addRelay { + } else { _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established) if err != nil { + logMsg.WithError(err).Error("Failed to add relay") return } } relay, ok := h.relayState.QueryRelayForByIp(from) - if ok && m.InitiatorRelayIndex != relay.RemoteIndex { - // Do something, Something happened. + if !ok { + logMsg.Error("Relay State not found") + return } resp := NebulaControl{ @@ -204,15 +220,22 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N } msg, err := resp.Marshal() if err != nil { - rm.l. + logMsg. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(resp.RelayFromIp), + "relayTarget": iputil.VpnIp(resp.RelayToIp), + "initiatorIdx": resp.InitiatorRelayIndex, + "responderIdx": resp.ResponderRelayIndex, + "vpnIp": h.vpnIp}). + Info("send CreateRelayResponse") } return } else { // the target is not me. Create a relay to the target, from me. - if rm.GetAmRelay() == false { + if !rm.GetAmRelay() { return } peer, err := rm.hostmap.QueryVpnIp(target) @@ -252,10 +275,17 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N } msg, err := req.Marshal() if err != nil { - rm.l. + logMsg. WithError(err).Error("relayManager Failed to marshal Control message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, target, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(req.RelayFromIp), + "relayTarget": iputil.VpnIp(req.RelayToIp), + "initiatorIdx": req.InitiatorRelayIndex, + "responderIdx": req.ResponderRelayIndex, + "vpnIp": target}). + Info("send CreateRelayRequest") } } // Also track the half-created Relay state just received @@ -268,24 +298,20 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N } _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state) if err != nil { - rm.l. + logMsg. WithError(err).Error("relayManager Failed to allocate a local index for relay") return } } else { - if relay.RemoteIndex != m.InitiatorRelayIndex { - // This is a stale Relay entry for the same tunnel targets. - // Clean up the existing stuff. - rm.RemoveRelay(relay.LocalIndex) - // Add the new relay - _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, Requested) - if err != nil { - return - } - relay, _ = h.relayState.QueryRelayForByIp(target) - } switch relay.State { case Established: + if relay.RemoteIndex != m.InitiatorRelayIndex { + // We got a brand new Relay request, because its index is different than what we saw before. + // This should never happen. The peer should never change an index, once created. + logMsg.WithFields(logrus.Fields{ + "existingRemoteIdx": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") + return + } resp := NebulaControl{ Type: NebulaControl_CreateRelayResponse, ResponderRelayIndex: relay.LocalIndex, @@ -299,6 +325,13 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay") } else { f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.WithFields(logrus.Fields{ + "relayFrom": iputil.VpnIp(resp.RelayFromIp), + "relayTarget": iputil.VpnIp(resp.RelayToIp), + "initiatorIdx": resp.InitiatorRelayIndex, + "responderIdx": resp.ResponderRelayIndex, + "vpnIp": h.vpnIp}). + Info("send CreateRelayResponse") } case Requested: diff --git a/udp/udp_tester.go b/udp/udp_tester.go index b3e2498..62e4f56 100644 --- a/udp/udp_tester.go +++ b/udp/udp_tester.go @@ -62,7 +62,7 @@ func (u *Conn) Send(packet *Packet) { if err := h.Parse(packet.Data); err != nil { panic(err) } - if u.l.Level >= logrus.InfoLevel { + if u.l.Level >= logrus.DebugLevel { u.l.WithField("header", h). WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)). WithField("dataLen", len(packet.Data)).