removed singleton eventbus. Moved to application init.

Added interface for eventbus.
verified working.
This commit is contained in:
Jason Kulatunga 2023-09-20 13:57:12 -07:00
parent 85986cf95c
commit b6da493d62
14 changed files with 175 additions and 161 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/analogj/go-util/utils"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/errors"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/version"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web"
"github.com/sirupsen/logrus"
@ -104,7 +105,11 @@ func main() {
settingsData, err := json.Marshal(appconfig.AllSettings())
appLogger.Debug(string(settingsData), err)
webServer := web.AppEngine{Config: appconfig, Logger: appLogger}
webServer := web.AppEngine{
Config: appconfig,
Logger: appLogger,
EventBus: event_bus.NewEventBusServer(appLogger),
}
return webServer.Start()
},

View File

@ -8,9 +8,7 @@ const (
ContextKeyTypeConfig string = "CONFIG"
ContextKeyTypeDatabase string = "REPOSITORY"
ContextKeyTypeLogger string = "LOGGER"
ContextKeyTypeSSEEventBusServer string = "SSE_EVENT_BUS_SERVER"
ContextKeyTypeSSEClientChannel string = "SSE_CLIENT_CHANNEL"
ContextKeyTypeEventBusServer string = "EVENT_BUS_SERVER"
ContextKeyTypeAuthUsername string = "AUTH_USERNAME"
ContextKeyTypeAuthToken string = "AUTH_TOKEN"

View File

@ -22,7 +22,7 @@ import (
"strings"
)
func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) (DatabaseRepository, error) {
func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) {
//backgroundContext := context.Background()
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -45,7 +45,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger)
})
database, err := gorm.Open(sqlite.Open(appConfig.GetString("database.location")+pragmaStr), &gorm.Config{
//TODO: figure out how to log database queries again.
//Logger: Logger
//logger: logger
DisableForeignKeyConstraintWhenMigrating: true,
})
@ -62,7 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger)
AppConfig: appConfig,
Logger: globalLogger,
GormClient: database,
EventBus: event_bus.GetEventBusServer(globalLogger),
EventBus: eventBus,
}
//TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized.
@ -95,7 +95,7 @@ type SqliteRepository struct {
GormClient *gorm.DB
EventBus *event_bus.EventBus
EventBus event_bus.Interface
}
func (sr *SqliteRepository) Migrate() error {
@ -366,7 +366,7 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM
err = sr.EventBus.PublishMessage(eventSourceSync)
if err != nil {
sr.Logger.Warnf("ignoring: an error occurred while publishing event to EventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err)
sr.Logger.Warnf("ignoring: an error occurred while publishing event to eventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err)
}
createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{

View File

@ -2,76 +2,27 @@ package event_bus
import (
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/sirupsen/logrus"
"log"
"sync"
)
var eventBusLock = &sync.Mutex{}
var singletonEventBusInstance *EventBus
// New event messages are broadcast to all registered client connection channels
// TODO: change this to be use specific channels.
type ClientChan chan string
// Get a reference to the EventBus singleton Start procnteessing requests
// this should be a singleton, to ensure that we're always broadcasting to the same clients
// see: https://refactoring.guru/design-patterns/singleton/go/example
func GetEventBusServer(logger logrus.FieldLogger) *EventBus {
if singletonEventBusInstance == nil {
eventBusLock.Lock()
defer eventBusLock.Unlock()
if singletonEventBusInstance == nil {
fmt.Println("Creating single instance now.")
singletonEventBusInstance = &EventBus{
Logger: logger,
Message: make(chan EventBusMessage),
NewListener: make(chan *EventBusListener),
ClosedListener: make(chan *EventBusListener),
TotalRoomListeners: make(map[string][]*EventBusListener),
}
// Start processing requests
go singletonEventBusInstance.listen()
//background keep-alive for testing
//go func() {
// for {
// time.Sleep(time.Second * 10)
// // Send current time
// singletonEventBusInstance.PublishMessage(models.NewEventKeepAlive("keep-alive"))
// }
//}()
} else {
fmt.Println("Single instance already created.")
}
} else {
fmt.Println("Single instance already created.")
}
return singletonEventBusInstance
}
// It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type EventBus struct {
Logger logrus.FieldLogger
type eventBus struct {
logger logrus.FieldLogger
// Events are pushed to this channel by the main events-gathering routine
Message chan EventBusMessage
message chan EventBusMessage
// New client connections
NewListener chan *EventBusListener
newListener chan *EventBusListener
// Closed client connections
ClosedListener chan *EventBusListener
closedListener chan *EventBusListener
// Total client connections
TotalRoomListeners map[string][]*EventBusListener
totalRoomListeners map[string][]*EventBusListener
}
type EventBusListener struct {
@ -87,42 +38,42 @@ type EventBusMessage struct {
// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
// TODO: determine how to route messages based on authenticated client
func (bus *EventBus) listen() {
func (bus *eventBus) listen() {
for {
select {
// Add new available client
case listener := <-bus.NewListener:
case listener := <-bus.newListener:
//check if this userId room already exists, or create it
if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists {
bus.TotalRoomListeners[listener.UserID] = []*EventBusListener{}
if _, exists := bus.totalRoomListeners[listener.UserID]; !exists {
bus.totalRoomListeners[listener.UserID] = []*EventBusListener{}
}
bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID], listener)
log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.TotalRoomListeners[listener.UserID]))
bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID], listener)
log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.totalRoomListeners[listener.UserID]))
// Remove closed client
case listener := <-bus.ClosedListener:
if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists {
case listener := <-bus.closedListener:
if _, exists := bus.totalRoomListeners[listener.UserID]; !exists {
log.Printf("Room `%s` not found", listener.UserID)
continue
} else {
//loop through all the listeners in the room and remove the one that matches
for i, v := range bus.TotalRoomListeners[listener.UserID] {
for i, v := range bus.totalRoomListeners[listener.UserID] {
if v.ResponseChan == listener.ResponseChan {
bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID][:i], bus.TotalRoomListeners[listener.UserID][i+1:]...)
bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID][:i], bus.totalRoomListeners[listener.UserID][i+1:]...)
close(listener.ResponseChan)
log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.TotalRoomListeners[listener.UserID]))
log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.totalRoomListeners[listener.UserID]))
break
}
}
}
// Broadcast message to client
case eventMsg := <-bus.Message:
if _, exists := bus.TotalRoomListeners[eventMsg.UserID]; !exists {
case eventMsg := <-bus.message:
if _, exists := bus.totalRoomListeners[eventMsg.UserID]; !exists {
log.Printf("Room `%s` not found, could not send message: `%s`", eventMsg.UserID, eventMsg.Message)
continue
} else {
for _, roomListener := range bus.TotalRoomListeners[eventMsg.UserID] {
for _, roomListener := range bus.totalRoomListeners[eventMsg.UserID] {
roomListener.ResponseChan <- eventMsg.Message
}
}
@ -131,15 +82,34 @@ func (bus *EventBus) listen() {
}
}
func (bus *EventBus) PublishMessage(eventMsg models.EventInterface) error {
bus.Logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID())
func (bus *eventBus) PublishMessage(eventMsg models.EventInterface) error {
bus.logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID())
payload, err := json.Marshal(eventMsg)
if err != nil {
return err
}
bus.Message <- EventBusMessage{
bus.message <- EventBusMessage{
UserID: eventMsg.GetUserID(),
Message: string(payload),
}
return nil
}
func (bus *eventBus) AddListener(listener *EventBusListener) {
bus.newListener <- listener
}
func (bus *eventBus) RemoveListener(listener *EventBusListener) {
bus.closedListener <- listener
}
func (bus *eventBus) TotalRooms() int {
return len(bus.totalRoomListeners)
}
func (bus *eventBus) TotalListenersByRoom(room string) int {
listeners, ok := bus.totalRoomListeners[room]
if !ok {
return 0
} else {
return len(listeners)
}
}

View File

@ -0,0 +1,15 @@
package event_bus
import (
"github.com/stretchr/testify/require"
"testing"
)
func TestEventBusInterface(t *testing.T) {
t.Parallel()
eventBusInstance := new(eventBus)
//assert
require.Implements(t, (*Interface)(nil), eventBusInstance, "should implement the eventBus interface")
}

View File

@ -0,0 +1,21 @@
package event_bus
import (
"fmt"
"github.com/sirupsen/logrus"
)
func NewEventBusServer(logger logrus.FieldLogger) Interface {
fmt.Println("Creating event bus instance now.")
eventBusInstance := &eventBus{
logger: logger,
message: make(chan EventBusMessage),
newListener: make(chan *EventBusListener),
closedListener: make(chan *EventBusListener),
totalRoomListeners: make(map[string][]*EventBusListener),
}
// Start processing requests
go eventBusInstance.listen()
return eventBusInstance
}

View File

@ -0,0 +1,12 @@
package event_bus
import "github.com/fastenhealth/fasten-onprem/backend/pkg/models"
//go:generate mockgen -source=interface.go -destination=mock/mock_event_bus.go
type Interface interface {
PublishMessage(eventMsg models.EventInterface) error
AddListener(listener *EventBusListener)
RemoveListener(listener *EventBusListener)
TotalRooms() int
TotalListenersByRoom(room string) int
}

View File

@ -1,11 +1,12 @@
package handler
import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"io"
"log"
)
// SSEStream is a handler for the server sent event stream (notifications from background processes)
@ -14,25 +15,41 @@ import (
//
// test using:
// curl -N -H "Authorization: Bearer xxxxx" http://localhost:9090/api/secure/sse/stream
func SSEStream(c *gin.Context) {
//logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry)
//databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
v, ok := c.Get(pkg.ContextKeyTypeSSEClientChannel)
if !ok {
log.Printf("could not get client channel from context")
func SSEEventBusServerHandler(eventBus event_bus.Interface) gin.HandlerFunc {
return func(c *gin.Context) {
//get a reference to the current user
databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
foundUser, err := databaseRepo.GetCurrentUser(c)
if err != nil || foundUser == nil {
c.Error(fmt.Errorf("could not find user"))
return
}
listener, ok := v.(*event_bus.EventBusListener)
if !ok {
return
// Initialize client channel
clientListener := event_bus.EventBusListener{
ResponseChan: make(chan string),
UserID: foundUser.ID.String(),
}
// Send new connection to event server
eventBus.AddListener(&clientListener)
defer func() {
// Send closed connection to event server
eventBus.RemoveListener(&clientListener)
}()
c.Stream(func(w io.Writer) bool {
// Stream message to client from message channel
if msg, ok := <-listener.ResponseChan; ok {
if msg, ok := <-clientListener.ResponseChan; ok {
c.SSEvent("message", msg)
return true
}
return false
})
}
}

View File

@ -156,6 +156,7 @@ func CreateSource(c *gin.Context) {
func SourceSync(c *gin.Context) {
logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry)
databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
eventBus := c.MustGet(pkg.ContextKeyTypeEventBusServer).(event_bus.Interface)
logger.Infof("Get SourceCredential Credentials: %v", c.Param("sourceId"))
@ -175,7 +176,7 @@ func SourceSync(c *gin.Context) {
//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)
err = event_bus.GetEventBusServer(logger).PublishMessage(
err = eventBus.PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
sourceCred.ID.String(),
@ -191,6 +192,7 @@ func SourceSync(c *gin.Context) {
func CreateManualSource(c *gin.Context) {
logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry)
databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
eventBus := c.MustGet(pkg.ContextKeyTypeEventBusServer).(event_bus.Interface)
// single file
file, err := c.FormFile("file")
@ -260,7 +262,7 @@ func CreateManualSource(c *gin.Context) {
//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)
err = event_bus.GetEventBusServer(logger).PublishMessage(
err = eventBus.PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
manualSourceCredential.ID.String(),

View File

@ -4,13 +4,14 @@ import (
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
func RepositoryMiddleware(appConfig config.Interface, globalLogger logrus.FieldLogger) gin.HandlerFunc {
func RepositoryMiddleware(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) gin.HandlerFunc {
deviceRepo, err := database.NewRepository(appConfig, globalLogger)
deviceRepo, err := database.NewRepository(appConfig, globalLogger, eventBus)
if err != nil {
panic(err)
}

View File

@ -0,0 +1,15 @@
package middleware
import (
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
)
func EventBusMiddleware(eventBus event_bus.Interface) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set(pkg.ContextKeyTypeEventBusServer, eventBus)
c.Next()
}
}

View File

@ -28,7 +28,7 @@ import (
var timeFormat = "02/Jan/2006:15:04:05 -0700"
// Logger is the logrus logger handler
// logger is the logrus logger handler
func LoggerMiddleware(logger *logrus.Entry) gin.HandlerFunc {
hostname, err := os.Hostname()

View File

@ -1,12 +1,7 @@
package middleware
import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
func SSEHeaderMiddleware() gin.HandlerFunc {
@ -18,39 +13,3 @@ func SSEHeaderMiddleware() gin.HandlerFunc {
c.Next()
}
}
func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc {
// get reference to streaming server singleton
bus := event_bus.GetEventBusServer(logger)
return func(c *gin.Context) {
//get a reference to the current user
databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
foundUser, err := databaseRepo.GetCurrentUser(c)
if err != nil || foundUser == nil {
c.Error(fmt.Errorf("could not find user"))
return
}
// Initialize client channel
clientListener := event_bus.EventBusListener{
ResponseChan: make(chan string),
UserID: foundUser.ID.String(),
}
// Send new connection to event server
bus.NewListener <- &clientListener
defer func() {
// Send closed connection to event server
bus.ClosedListener <- &clientListener
}()
c.Set(pkg.ContextKeyTypeSSEEventBusServer, &bus)
c.Set(pkg.ContextKeyTypeSSEClientChannel, &clientListener)
c.Next()
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/handler"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware"
"github.com/gin-gonic/gin"
@ -17,14 +18,16 @@ import (
type AppEngine struct {
Config config.Interface
Logger *logrus.Entry
EventBus event_bus.Interface
}
func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
r := gin.New()
r.Use(middleware.LoggerMiddleware(ae.Logger))
r.Use(middleware.RepositoryMiddleware(ae.Config, ae.Logger))
r.Use(middleware.RepositoryMiddleware(ae.Config, ae.Logger, ae.EventBus))
r.Use(middleware.ConfigMiddleware(ae.Config))
r.Use(middleware.EventBusMiddleware(ae.EventBus))
r.Use(gin.Recovery())
basePath := ae.Config.GetString("web.listen.basepath")
@ -39,14 +42,11 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
// check if the /web folder is populated.
// check if access to database
bus := event_bus.GetEventBusServer(ae.Logger)
bus.Message <- event_bus.EventBusMessage{
UserID: "heartbeat",
Message: "sse heartbeat",
}
keepAliveMsg := models.NewEventKeepAlive("heartbeat")
err := ae.EventBus.PublishMessage(keepAliveMsg)
c.JSON(http.StatusOK, gin.H{
"success": true,
"success": err == nil,
})
})
@ -84,8 +84,7 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
if runtime.GOOS != "windows" {
secure.GET("/events/stream",
middleware.SSEHeaderMiddleware(),
middleware.SSEEventBusServerMiddleware(ae.Logger),
handler.SSEStream,
handler.SSEEventBusServerHandler(ae.EventBus),
)
}
}