diff --git a/backend/cmd/fasten/fasten.go b/backend/cmd/fasten/fasten.go index f1e3b352..c7cd6130 100644 --- a/backend/cmd/fasten/fasten.go +++ b/backend/cmd/fasten/fasten.go @@ -6,6 +6,7 @@ import ( "github.com/analogj/go-util/utils" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" "github.com/fastenhealth/fasten-onprem/backend/pkg/errors" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/version" "github.com/fastenhealth/fasten-onprem/backend/pkg/web" "github.com/sirupsen/logrus" @@ -104,7 +105,11 @@ func main() { settingsData, err := json.Marshal(appconfig.AllSettings()) appLogger.Debug(string(settingsData), err) - webServer := web.AppEngine{Config: appconfig, Logger: appLogger} + webServer := web.AppEngine{ + Config: appconfig, + Logger: appLogger, + EventBus: event_bus.NewEventBusServer(appLogger), + } return webServer.Start() }, diff --git a/backend/pkg/constants.go b/backend/pkg/constants.go index faaaef3f..05e5b0e7 100644 --- a/backend/pkg/constants.go +++ b/backend/pkg/constants.go @@ -5,12 +5,10 @@ type ResourceGraphType string const ( ResourceListPageSize int = 20 - ContextKeyTypeConfig string = "CONFIG" - ContextKeyTypeDatabase string = "REPOSITORY" - ContextKeyTypeLogger string = "LOGGER" - - ContextKeyTypeSSEEventBusServer string = "SSE_EVENT_BUS_SERVER" - ContextKeyTypeSSEClientChannel string = "SSE_CLIENT_CHANNEL" + ContextKeyTypeConfig string = "CONFIG" + ContextKeyTypeDatabase string = "REPOSITORY" + ContextKeyTypeLogger string = "LOGGER" + ContextKeyTypeEventBusServer string = "EVENT_BUS_SERVER" ContextKeyTypeAuthUsername string = "AUTH_USERNAME" ContextKeyTypeAuthToken string = "AUTH_TOKEN" diff --git a/backend/pkg/database/sqlite_repository.go b/backend/pkg/database/sqlite_repository.go index 324eef01..5cca131f 100644 --- a/backend/pkg/database/sqlite_repository.go +++ b/backend/pkg/database/sqlite_repository.go @@ -22,7 +22,7 @@ import ( "strings" ) -func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) (DatabaseRepository, error) { +func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) { //backgroundContext := context.Background() //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -45,7 +45,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) }) database, err := gorm.Open(sqlite.Open(appConfig.GetString("database.location")+pragmaStr), &gorm.Config{ //TODO: figure out how to log database queries again. - //Logger: Logger + //logger: logger DisableForeignKeyConstraintWhenMigrating: true, }) @@ -62,7 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) AppConfig: appConfig, Logger: globalLogger, GormClient: database, - EventBus: event_bus.GetEventBusServer(globalLogger), + EventBus: eventBus, } //TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized. @@ -95,7 +95,7 @@ type SqliteRepository struct { GormClient *gorm.DB - EventBus *event_bus.EventBus + EventBus event_bus.Interface } func (sr *SqliteRepository) Migrate() error { @@ -366,7 +366,7 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM err = sr.EventBus.PublishMessage(eventSourceSync) if err != nil { - sr.Logger.Warnf("ignoring: an error occurred while publishing event to EventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) + sr.Logger.Warnf("ignoring: an error occurred while publishing event to eventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) } createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{ diff --git a/backend/pkg/event_bus/event_bus.go b/backend/pkg/event_bus/event_bus.go index cf4f6d1e..e148daff 100644 --- a/backend/pkg/event_bus/event_bus.go +++ b/backend/pkg/event_bus/event_bus.go @@ -2,76 +2,27 @@ package event_bus import ( "encoding/json" - "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" "github.com/sirupsen/logrus" "log" - "sync" ) -var eventBusLock = &sync.Mutex{} - -var singletonEventBusInstance *EventBus - -// New event messages are broadcast to all registered client connection channels -// TODO: change this to be use specific channels. -type ClientChan chan string - -// Get a reference to the EventBus singleton Start procnteessing requests -// this should be a singleton, to ensure that we're always broadcasting to the same clients -// see: https://refactoring.guru/design-patterns/singleton/go/example -func GetEventBusServer(logger logrus.FieldLogger) *EventBus { - if singletonEventBusInstance == nil { - eventBusLock.Lock() - defer eventBusLock.Unlock() - if singletonEventBusInstance == nil { - fmt.Println("Creating single instance now.") - singletonEventBusInstance = &EventBus{ - Logger: logger, - Message: make(chan EventBusMessage), - NewListener: make(chan *EventBusListener), - ClosedListener: make(chan *EventBusListener), - TotalRoomListeners: make(map[string][]*EventBusListener), - } - - // Start processing requests - go singletonEventBusInstance.listen() - - //background keep-alive for testing - //go func() { - // for { - // time.Sleep(time.Second * 10) - // // Send current time - // singletonEventBusInstance.PublishMessage(models.NewEventKeepAlive("keep-alive")) - // } - //}() - - } else { - fmt.Println("Single instance already created.") - } - } else { - fmt.Println("Single instance already created.") - } - - return singletonEventBusInstance -} - // It keeps a list of clients those are currently attached // and broadcasting events to those clients. -type EventBus struct { - Logger logrus.FieldLogger +type eventBus struct { + logger logrus.FieldLogger // Events are pushed to this channel by the main events-gathering routine - Message chan EventBusMessage + message chan EventBusMessage // New client connections - NewListener chan *EventBusListener + newListener chan *EventBusListener // Closed client connections - ClosedListener chan *EventBusListener + closedListener chan *EventBusListener // Total client connections - TotalRoomListeners map[string][]*EventBusListener + totalRoomListeners map[string][]*EventBusListener } type EventBusListener struct { @@ -87,42 +38,42 @@ type EventBusMessage struct { // It Listens all incoming requests from clients. // Handles addition and removal of clients and broadcast messages to clients. // TODO: determine how to route messages based on authenticated client -func (bus *EventBus) listen() { +func (bus *eventBus) listen() { for { select { // Add new available client - case listener := <-bus.NewListener: + case listener := <-bus.newListener: //check if this userId room already exists, or create it - if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists { - bus.TotalRoomListeners[listener.UserID] = []*EventBusListener{} + if _, exists := bus.totalRoomListeners[listener.UserID]; !exists { + bus.totalRoomListeners[listener.UserID] = []*EventBusListener{} } - bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID], listener) - log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.TotalRoomListeners[listener.UserID])) + bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID], listener) + log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.totalRoomListeners[listener.UserID])) // Remove closed client - case listener := <-bus.ClosedListener: - if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists { + case listener := <-bus.closedListener: + if _, exists := bus.totalRoomListeners[listener.UserID]; !exists { log.Printf("Room `%s` not found", listener.UserID) continue } else { //loop through all the listeners in the room and remove the one that matches - for i, v := range bus.TotalRoomListeners[listener.UserID] { + for i, v := range bus.totalRoomListeners[listener.UserID] { if v.ResponseChan == listener.ResponseChan { - bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID][:i], bus.TotalRoomListeners[listener.UserID][i+1:]...) + bus.totalRoomListeners[listener.UserID] = append(bus.totalRoomListeners[listener.UserID][:i], bus.totalRoomListeners[listener.UserID][i+1:]...) close(listener.ResponseChan) - log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.TotalRoomListeners[listener.UserID])) + log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.totalRoomListeners[listener.UserID])) break } } } // Broadcast message to client - case eventMsg := <-bus.Message: - if _, exists := bus.TotalRoomListeners[eventMsg.UserID]; !exists { + case eventMsg := <-bus.message: + if _, exists := bus.totalRoomListeners[eventMsg.UserID]; !exists { log.Printf("Room `%s` not found, could not send message: `%s`", eventMsg.UserID, eventMsg.Message) continue } else { - for _, roomListener := range bus.TotalRoomListeners[eventMsg.UserID] { + for _, roomListener := range bus.totalRoomListeners[eventMsg.UserID] { roomListener.ResponseChan <- eventMsg.Message } } @@ -131,15 +82,34 @@ func (bus *EventBus) listen() { } } -func (bus *EventBus) PublishMessage(eventMsg models.EventInterface) error { - bus.Logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID()) +func (bus *eventBus) PublishMessage(eventMsg models.EventInterface) error { + bus.logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID()) payload, err := json.Marshal(eventMsg) if err != nil { return err } - bus.Message <- EventBusMessage{ + bus.message <- EventBusMessage{ UserID: eventMsg.GetUserID(), Message: string(payload), } return nil } + +func (bus *eventBus) AddListener(listener *EventBusListener) { + bus.newListener <- listener +} +func (bus *eventBus) RemoveListener(listener *EventBusListener) { + bus.closedListener <- listener +} +func (bus *eventBus) TotalRooms() int { + return len(bus.totalRoomListeners) +} + +func (bus *eventBus) TotalListenersByRoom(room string) int { + listeners, ok := bus.totalRoomListeners[room] + if !ok { + return 0 + } else { + return len(listeners) + } +} diff --git a/backend/pkg/event_bus/event_bus_test.go b/backend/pkg/event_bus/event_bus_test.go new file mode 100644 index 00000000..1e89dbbc --- /dev/null +++ b/backend/pkg/event_bus/event_bus_test.go @@ -0,0 +1,15 @@ +package event_bus + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestEventBusInterface(t *testing.T) { + t.Parallel() + + eventBusInstance := new(eventBus) + + //assert + require.Implements(t, (*Interface)(nil), eventBusInstance, "should implement the eventBus interface") +} diff --git a/backend/pkg/event_bus/factory.go b/backend/pkg/event_bus/factory.go new file mode 100644 index 00000000..8024c77d --- /dev/null +++ b/backend/pkg/event_bus/factory.go @@ -0,0 +1,21 @@ +package event_bus + +import ( + "fmt" + "github.com/sirupsen/logrus" +) + +func NewEventBusServer(logger logrus.FieldLogger) Interface { + fmt.Println("Creating event bus instance now.") + eventBusInstance := &eventBus{ + logger: logger, + message: make(chan EventBusMessage), + newListener: make(chan *EventBusListener), + closedListener: make(chan *EventBusListener), + totalRoomListeners: make(map[string][]*EventBusListener), + } + + // Start processing requests + go eventBusInstance.listen() + return eventBusInstance +} diff --git a/backend/pkg/event_bus/interface.go b/backend/pkg/event_bus/interface.go new file mode 100644 index 00000000..37887a40 --- /dev/null +++ b/backend/pkg/event_bus/interface.go @@ -0,0 +1,12 @@ +package event_bus + +import "github.com/fastenhealth/fasten-onprem/backend/pkg/models" + +//go:generate mockgen -source=interface.go -destination=mock/mock_event_bus.go +type Interface interface { + PublishMessage(eventMsg models.EventInterface) error + AddListener(listener *EventBusListener) + RemoveListener(listener *EventBusListener) + TotalRooms() int + TotalListenersByRoom(room string) int +} diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go index 33924f12..b36216c0 100644 --- a/backend/pkg/web/handler/server_sent_event.go +++ b/backend/pkg/web/handler/server_sent_event.go @@ -1,11 +1,12 @@ package handler import ( + "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/gin-gonic/gin" "io" - "log" ) // SSEStream is a handler for the server sent event stream (notifications from background processes) @@ -14,25 +15,41 @@ import ( // // test using: // curl -N -H "Authorization: Bearer xxxxx" http://localhost:9090/api/secure/sse/stream -func SSEStream(c *gin.Context) { - //logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) - //databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) - v, ok := c.Get(pkg.ContextKeyTypeSSEClientChannel) - if !ok { - log.Printf("could not get client channel from context") - return - } - listener, ok := v.(*event_bus.EventBusListener) - if !ok { - return - } - c.Stream(func(w io.Writer) bool { - // Stream message to client from message channel - if msg, ok := <-listener.ResponseChan; ok { - c.SSEvent("message", msg) - return true +func SSEEventBusServerHandler(eventBus event_bus.Interface) gin.HandlerFunc { + + return func(c *gin.Context) { + //get a reference to the current user + databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) + + foundUser, err := databaseRepo.GetCurrentUser(c) + if err != nil || foundUser == nil { + c.Error(fmt.Errorf("could not find user")) + return } - return false - }) + + // Initialize client channel + clientListener := event_bus.EventBusListener{ + ResponseChan: make(chan string), + UserID: foundUser.ID.String(), + } + + // Send new connection to event server + eventBus.AddListener(&clientListener) + + defer func() { + // Send closed connection to event server + eventBus.RemoveListener(&clientListener) + }() + + c.Stream(func(w io.Writer) bool { + // Stream message to client from message channel + if msg, ok := <-clientListener.ResponseChan; ok { + c.SSEvent("message", msg) + return true + } + return false + }) + + } } diff --git a/backend/pkg/web/handler/source.go b/backend/pkg/web/handler/source.go index 6c60350a..9457ff16 100644 --- a/backend/pkg/web/handler/source.go +++ b/backend/pkg/web/handler/source.go @@ -156,6 +156,7 @@ func CreateSource(c *gin.Context) { func SourceSync(c *gin.Context) { logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) + eventBus := c.MustGet(pkg.ContextKeyTypeEventBusServer).(event_bus.Interface) logger.Infof("Get SourceCredential Credentials: %v", c.Param("sourceId")) @@ -175,7 +176,7 @@ func SourceSync(c *gin.Context) { //publish event currentUser, _ := databaseRepo.GetCurrentUser(c) - err = event_bus.GetEventBusServer(logger).PublishMessage( + err = eventBus.PublishMessage( models.NewEventSourceComplete( currentUser.ID.String(), sourceCred.ID.String(), @@ -191,6 +192,7 @@ func SourceSync(c *gin.Context) { func CreateManualSource(c *gin.Context) { logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) + eventBus := c.MustGet(pkg.ContextKeyTypeEventBusServer).(event_bus.Interface) // single file file, err := c.FormFile("file") @@ -260,7 +262,7 @@ func CreateManualSource(c *gin.Context) { //publish event currentUser, _ := databaseRepo.GetCurrentUser(c) - err = event_bus.GetEventBusServer(logger).PublishMessage( + err = eventBus.PublishMessage( models.NewEventSourceComplete( currentUser.ID.String(), manualSourceCredential.ID.String(), diff --git a/backend/pkg/web/middleware/database.go b/backend/pkg/web/middleware/database.go index 67215817..757d81f4 100644 --- a/backend/pkg/web/middleware/database.go +++ b/backend/pkg/web/middleware/database.go @@ -4,13 +4,14 @@ import ( "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" "github.com/fastenhealth/fasten-onprem/backend/pkg/database" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) -func RepositoryMiddleware(appConfig config.Interface, globalLogger logrus.FieldLogger) gin.HandlerFunc { +func RepositoryMiddleware(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) gin.HandlerFunc { - deviceRepo, err := database.NewRepository(appConfig, globalLogger) + deviceRepo, err := database.NewRepository(appConfig, globalLogger, eventBus) if err != nil { panic(err) } diff --git a/backend/pkg/web/middleware/event_bus.go b/backend/pkg/web/middleware/event_bus.go new file mode 100644 index 00000000..ebfc7a68 --- /dev/null +++ b/backend/pkg/web/middleware/event_bus.go @@ -0,0 +1,15 @@ +package middleware + +import ( + "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" + "github.com/gin-gonic/gin" +) + +func EventBusMiddleware(eventBus event_bus.Interface) gin.HandlerFunc { + + return func(c *gin.Context) { + c.Set(pkg.ContextKeyTypeEventBusServer, eventBus) + c.Next() + } +} diff --git a/backend/pkg/web/middleware/logger.go b/backend/pkg/web/middleware/logger.go index 7c4b5890..137d4d20 100644 --- a/backend/pkg/web/middleware/logger.go +++ b/backend/pkg/web/middleware/logger.go @@ -28,7 +28,7 @@ import ( var timeFormat = "02/Jan/2006:15:04:05 -0700" -// Logger is the logrus logger handler +// logger is the logrus logger handler func LoggerMiddleware(logger *logrus.Entry) gin.HandlerFunc { hostname, err := os.Hostname() diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go index 3c876c25..17deef6f 100644 --- a/backend/pkg/web/middleware/server_sent_event.go +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -1,12 +1,7 @@ package middleware import ( - "fmt" - "github.com/fastenhealth/fasten-onprem/backend/pkg" - "github.com/fastenhealth/fasten-onprem/backend/pkg/database" - "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" ) func SSEHeaderMiddleware() gin.HandlerFunc { @@ -18,39 +13,3 @@ func SSEHeaderMiddleware() gin.HandlerFunc { c.Next() } } - -func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc { - - // get reference to streaming server singleton - bus := event_bus.GetEventBusServer(logger) - - return func(c *gin.Context) { - //get a reference to the current user - databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) - - foundUser, err := databaseRepo.GetCurrentUser(c) - if err != nil || foundUser == nil { - c.Error(fmt.Errorf("could not find user")) - return - } - - // Initialize client channel - clientListener := event_bus.EventBusListener{ - ResponseChan: make(chan string), - UserID: foundUser.ID.String(), - } - - // Send new connection to event server - bus.NewListener <- &clientListener - - defer func() { - // Send closed connection to event server - bus.ClosedListener <- &clientListener - }() - - c.Set(pkg.ContextKeyTypeSSEEventBusServer, &bus) - c.Set(pkg.ContextKeyTypeSSEClientChannel, &clientListener) - - c.Next() - } -} diff --git a/backend/pkg/web/server.go b/backend/pkg/web/server.go index beb9a36c..ec9d441e 100644 --- a/backend/pkg/web/server.go +++ b/backend/pkg/web/server.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" "github.com/fastenhealth/fasten-onprem/backend/pkg/web/handler" "github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware" "github.com/gin-gonic/gin" @@ -15,16 +16,18 @@ import ( ) type AppEngine struct { - Config config.Interface - Logger *logrus.Entry + Config config.Interface + Logger *logrus.Entry + EventBus event_bus.Interface } func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { r := gin.New() r.Use(middleware.LoggerMiddleware(ae.Logger)) - r.Use(middleware.RepositoryMiddleware(ae.Config, ae.Logger)) + r.Use(middleware.RepositoryMiddleware(ae.Config, ae.Logger, ae.EventBus)) r.Use(middleware.ConfigMiddleware(ae.Config)) + r.Use(middleware.EventBusMiddleware(ae.EventBus)) r.Use(gin.Recovery()) basePath := ae.Config.GetString("web.listen.basepath") @@ -39,14 +42,11 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { // check if the /web folder is populated. // check if access to database - bus := event_bus.GetEventBusServer(ae.Logger) - bus.Message <- event_bus.EventBusMessage{ - UserID: "heartbeat", - Message: "sse heartbeat", - } + keepAliveMsg := models.NewEventKeepAlive("heartbeat") + err := ae.EventBus.PublishMessage(keepAliveMsg) c.JSON(http.StatusOK, gin.H{ - "success": true, + "success": err == nil, }) }) @@ -84,8 +84,7 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { if runtime.GOOS != "windows" { secure.GET("/events/stream", middleware.SSEHeaderMiddleware(), - middleware.SSEEventBusServerMiddleware(ae.Logger), - handler.SSEStream, + handler.SSEEventBusServerHandler(ae.EventBus), ) } }