diff --git a/backend/pkg/database/sqlite_repository.go b/backend/pkg/database/sqlite_repository.go index 5cfed3ea..324eef01 100644 --- a/backend/pkg/database/sqlite_repository.go +++ b/backend/pkg/database/sqlite_repository.go @@ -7,10 +7,10 @@ import ( "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" - "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" sourceModel "github.com/fastenhealth/fasten-sources/clients/models" "github.com/gin-gonic/gin" "github.com/glebarez/sqlite" @@ -62,7 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) AppConfig: appConfig, Logger: globalLogger, GormClient: database, - EventBus: sse.GetEventBusServer(), + EventBus: event_bus.GetEventBusServer(globalLogger), } //TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized. @@ -95,7 +95,7 @@ type SqliteRepository struct { GormClient *gorm.DB - EventBus *sse.EventBus + EventBus *event_bus.EventBus } func (sr *SqliteRepository) Migrate() error { @@ -357,10 +357,18 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) } - sr.EventBus.Message <- sse.EventBusMessage{ - Message: fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID), - UserID: currentUser.ID.String(), + eventSourceSync := models.NewEventSourceSync( + currentUser.ID.String(), + wrappedFhirResourceModel.GetSourceID().String(), + wrappedFhirResourceModel.GetSourceResourceType(), + wrappedFhirResourceModel.GetSourceResourceID(), + ) + + err = sr.EventBus.PublishMessage(eventSourceSync) + if err != nil { + sr.Logger.Warnf("ignoring: an error occurred while publishing event to EventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) } + createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{ SourceID: wrappedFhirResourceModel.GetSourceID(), SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), diff --git a/backend/pkg/web/sse/event_bus.go b/backend/pkg/event_bus/event_bus.go similarity index 86% rename from backend/pkg/web/sse/event_bus.go rename to backend/pkg/event_bus/event_bus.go index 6a582645..085bfcdf 100644 --- a/backend/pkg/web/sse/event_bus.go +++ b/backend/pkg/event_bus/event_bus.go @@ -1,7 +1,10 @@ -package sse +package event_bus import ( + "encoding/json" "fmt" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" + "github.com/sirupsen/logrus" "log" "sync" ) @@ -17,13 +20,14 @@ type ClientChan chan string // Get a reference to the EventBus singleton Start procnteessing requests // this should be a singleton, to ensure that we're always broadcasting to the same clients // see: https://refactoring.guru/design-patterns/singleton/go/example -func GetEventBusServer() *EventBus { +func GetEventBusServer(logger logrus.FieldLogger) *EventBus { if singletonEventBusInstance == nil { eventBusLock.Lock() defer eventBusLock.Unlock() if singletonEventBusInstance == nil { fmt.Println("Creating single instance now.") singletonEventBusInstance = &EventBus{ + Logger: logger, Message: make(chan EventBusMessage), NewListener: make(chan EventBusListener), ClosedListener: make(chan EventBusListener), @@ -45,6 +49,8 @@ func GetEventBusServer() *EventBus { // It keeps a list of clients those are currently attached // and broadcasting events to those clients. type EventBus struct { + Logger logrus.FieldLogger + // Events are pushed to this channel by the main events-gathering routine Message chan EventBusMessage @@ -114,3 +120,16 @@ func (bus *EventBus) listen() { } } } + +func (bus *EventBus) PublishMessage(eventMsg models.EventInterface) error { + bus.Logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID()) + payload, err := json.Marshal(eventMsg) + if err != nil { + return err + } + bus.Message <- EventBusMessage{ + UserID: eventMsg.GetUserID(), + Message: string(payload), + } + return nil +} diff --git a/backend/pkg/models/event.go b/backend/pkg/models/event.go new file mode 100644 index 00000000..7c5a8d89 --- /dev/null +++ b/backend/pkg/models/event.go @@ -0,0 +1,21 @@ +package models + +type EventSourceSyncStatus string + +const ( + EventTypeSourceSync EventSourceSyncStatus = "source_sync" + EventTypeSourceComplete EventSourceSyncStatus = "source_complete" +) + +type EventInterface interface { + GetUserID() string +} + +type Event struct { + UserID string `json:"-"` + EventType EventSourceSyncStatus `json:"event_type"` +} + +func (e *Event) GetUserID() string { + return e.UserID +} diff --git a/backend/pkg/models/event_source_complete.go b/backend/pkg/models/event_source_complete.go new file mode 100644 index 00000000..e1e087c5 --- /dev/null +++ b/backend/pkg/models/event_source_complete.go @@ -0,0 +1,16 @@ +package models + +type EventSourceComplete struct { + *Event `json:",inline"` + SourceID string `json:"source_id"` +} + +func NewEventSourceComplete(userID string, sourceID string) *EventSourceComplete { + return &EventSourceComplete{ + Event: &Event{ + UserID: userID, + EventType: EventTypeSourceComplete, + }, + SourceID: sourceID, + } +} diff --git a/backend/pkg/models/event_source_sync.go b/backend/pkg/models/event_source_sync.go new file mode 100644 index 00000000..5863a3b6 --- /dev/null +++ b/backend/pkg/models/event_source_sync.go @@ -0,0 +1,20 @@ +package models + +type EventSourceSync struct { + *Event `json:",inline"` + SourceID string `json:"source_id"` + ResourceType string `json:"resource_type"` + ResourceID string `json:"resource_id"` +} + +func NewEventSourceSync(userID string, sourceID string, resourceType string, resourceID string) *EventSourceSync { + return &EventSourceSync{ + Event: &Event{ + UserID: userID, + EventType: EventTypeSourceSync, + }, + SourceID: sourceID, + ResourceType: resourceType, + ResourceID: resourceID, + } +} diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go index d3f9ae7b..634fd665 100644 --- a/backend/pkg/web/handler/server_sent_event.go +++ b/backend/pkg/web/handler/server_sent_event.go @@ -2,7 +2,7 @@ package handler import ( "github.com/fastenhealth/fasten-onprem/backend/pkg" - "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/gin-gonic/gin" "io" "log" @@ -23,7 +23,7 @@ func SSEStream(c *gin.Context) { log.Printf("could not get client channel from context") return } - listener, ok := v.(sse.EventBusListener) + listener, ok := v.(event_bus.EventBusListener) if !ok { return } diff --git a/backend/pkg/web/handler/source.go b/backend/pkg/web/handler/source.go index 559832d9..5aad6047 100644 --- a/backend/pkg/web/handler/source.go +++ b/backend/pkg/web/handler/source.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/database" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/jwk" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" "github.com/fastenhealth/fasten-sources/clients/factory" @@ -143,7 +144,7 @@ func CreateSource(c *gin.Context) { // after creating the source, we should do a bulk import (in the background) - summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, &sourceCred) + summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, &sourceCred) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"success": false}) return @@ -166,11 +167,24 @@ func SourceSync(c *gin.Context) { } // after creating the source, we should do a bulk import (in the background) - summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, sourceCred) + summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, sourceCred) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"success": false}) return } + + //publish event + currentUser, _ := databaseRepo.GetCurrentUser(c) + err = event_bus.GetEventBusServer(logger).PublishMessage( + models.NewEventSourceComplete( + currentUser.ID.String(), + sourceCred.ID.String(), + ), + ) + if err != nil { + logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err) + } + c.JSON(http.StatusOK, gin.H{"success": true, "source": sourceCred, "data": summary}) } @@ -243,7 +257,21 @@ func CreateManualSource(c *gin.Context) { return } + //publish event + currentUser, _ := databaseRepo.GetCurrentUser(c) + + err = event_bus.GetEventBusServer(logger).PublishMessage( + models.NewEventSourceComplete( + currentUser.ID.String(), + manualSourceCredential.ID.String(), + ), + ) + if err != nil { + logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err) + } + c.JSON(http.StatusOK, gin.H{"success": true, "data": summary, "source": manualSourceCredential}) + } func GetSource(c *gin.Context) { @@ -314,3 +342,9 @@ func SyncSourceResources(c context.Context, logger *logrus.Entry, databaseRepo d return summary, nil } + +// + +func GetBackgroundContext(ginContext *gin.Context) context.Context { + return context.WithValue(ginContext.Request.Context(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string)) +} diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go index 2e594e34..52174b4c 100644 --- a/backend/pkg/web/middleware/server_sent_event.go +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -4,8 +4,9 @@ import ( "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/database" - "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" ) func SSEHeaderMiddleware() gin.HandlerFunc { @@ -18,10 +19,10 @@ func SSEHeaderMiddleware() gin.HandlerFunc { } } -func SSEEventBusServerMiddleware() gin.HandlerFunc { +func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc { // get reference to streaming server singleton - bus := sse.GetEventBusServer() + bus := event_bus.GetEventBusServer(logger) return func(c *gin.Context) { //get a reference to the current user @@ -34,7 +35,7 @@ func SSEEventBusServerMiddleware() gin.HandlerFunc { } // Initialize client channel - clientListener := sse.EventBusListener{ + clientListener := event_bus.EventBusListener{ ResponseChan: make(chan string), UserID: foundUser.ID.String(), } diff --git a/backend/pkg/web/server.go b/backend/pkg/web/server.go index a4d8b500..278623fb 100644 --- a/backend/pkg/web/server.go +++ b/backend/pkg/web/server.go @@ -4,9 +4,9 @@ import ( "embed" "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/web/handler" "github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware" - "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "net/http" @@ -38,8 +38,8 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { // check if the /web folder is populated. // check if access to database - bus := sse.GetEventBusServer() - bus.Message <- sse.EventBusMessage{ + bus := event_bus.GetEventBusServer(ae.Logger) + bus.Message <- event_bus.EventBusMessage{ UserID: "heartbeat", Message: "sse heartbeat", } @@ -79,7 +79,11 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { secure.POST("/query", handler.QueryResourceFhir) //server-side-events handler - secure.GET("/events/stream", middleware.SSEHeaderMiddleware(), middleware.SSEEventBusServerMiddleware(), handler.SSEStream) + secure.GET("/events/stream", + middleware.SSEHeaderMiddleware(), + middleware.SSEEventBusServerMiddleware(ae.Logger), + handler.SSEStream, + ) } if ae.Config.GetBool("web.allow_unsafe_endpoints") {