fasten-onprem/backend/pkg/event_bus/event_bus.go

116 lines
3.4 KiB
Go

package event_bus
import (
"encoding/json"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/sirupsen/logrus"
"log"
)
// It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type eventBus struct {
logger logrus.FieldLogger
// Events are pushed to this channel by the main events-gathering routine
message chan EventBusMessage
// New client connections
newListener chan *EventBusListener
// Closed client connections
closedListener chan *EventBusListener
// Total client connections
totalRoomListeners map[string][]*EventBusListener
}
type EventBusListener struct {
ResponseChan chan string
UserID string
}
type EventBusMessage struct {
UserID string
Message string
}
// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
// TODO: determine how to route messages based on authenticated client
func (bus *eventBus) listen() {
for {
select {
// Add new available client
case listener := <-bus.newListener:
//check if this userId room already exists, or create it
if _, exists := bus.totalRoomListeners[listener.UserID]; !exists {
bus.totalRoomListeners[listener.UserID] = []*EventBusListener{}
}
bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID], listener)
log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.totalRoomListeners[listener.UserID]))
// Remove closed client
case listener := <-bus.closedListener:
if _, exists := bus.totalRoomListeners[listener.UserID]; !exists {
log.Printf("Room `%s` not found", listener.UserID)
continue
} else {
//loop through all the listeners in the room and remove the one that matches
for i, v := range bus.totalRoomListeners[listener.UserID] {
if v.ResponseChan == listener.ResponseChan {
bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID][:i], bus.totalRoomListeners[listener.UserID][i+1:]...)
close(listener.ResponseChan)
log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.totalRoomListeners[listener.UserID]))
break
}
}
}
// Broadcast message to client
case eventMsg := <-bus.message:
if _, exists := bus.totalRoomListeners[eventMsg.UserID]; !exists {
log.Printf("Room `%s` not found, could not send message: `%s`", eventMsg.UserID, eventMsg.Message)
continue
} else {
for _, roomListener := range bus.totalRoomListeners[eventMsg.UserID] {
roomListener.ResponseChan <- eventMsg.Message
}
}
}
}
}
func (bus *eventBus) PublishMessage(eventMsg models.EventInterface) error {
bus.logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID())
payload, err := json.Marshal(eventMsg)
if err != nil {
return err
}
bus.message <- EventBusMessage{
UserID: eventMsg.GetUserID(),
Message: string(payload),
}
return nil
}
func (bus *eventBus) AddListener(listener *EventBusListener) {
bus.newListener <- listener
}
func (bus *eventBus) RemoveListener(listener *EventBusListener) {
bus.closedListener <- listener
}
func (bus *eventBus) TotalRooms() int {
return len(bus.totalRoomListeners)
}
func (bus *eventBus) TotalListenersByRoom(room string) int {
listeners, ok := bus.totalRoomListeners[room]
if !ok {
return 0
} else {
return len(listeners)
}
}