moved event bus into its own package.

created models for event bus messaging.
added logger.
added source complete and sync events.
This commit is contained in:
Jason Kulatunga 2023-09-09 08:24:25 -07:00
parent b2bff9ccdd
commit 9e1c7455d1
9 changed files with 143 additions and 20 deletions

View File

@ -7,10 +7,10 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/utils"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
sourceModel "github.com/fastenhealth/fasten-sources/clients/models"
"github.com/gin-gonic/gin"
"github.com/glebarez/sqlite"
@ -62,7 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger)
AppConfig: appConfig,
Logger: globalLogger,
GormClient: database,
EventBus: sse.GetEventBusServer(),
EventBus: event_bus.GetEventBusServer(globalLogger),
}
//TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized.
@ -95,7 +95,7 @@ type SqliteRepository struct {
GormClient *gorm.DB
EventBus *sse.EventBus
EventBus *event_bus.EventBus
}
func (sr *SqliteRepository) Migrate() error {
@ -357,10 +357,18 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM
//wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw)
}
sr.EventBus.Message <- sse.EventBusMessage{
Message: fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID),
UserID: currentUser.ID.String(),
eventSourceSync := models.NewEventSourceSync(
currentUser.ID.String(),
wrappedFhirResourceModel.GetSourceID().String(),
wrappedFhirResourceModel.GetSourceResourceType(),
wrappedFhirResourceModel.GetSourceResourceID(),
)
err = sr.EventBus.PublishMessage(eventSourceSync)
if err != nil {
sr.Logger.Warnf("ignoring: an error occurred while publishing event to EventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err)
}
createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{
SourceID: wrappedFhirResourceModel.GetSourceID(),
SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(),

View File

@ -1,7 +1,10 @@
package sse
package event_bus
import (
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/sirupsen/logrus"
"log"
"sync"
)
@ -17,13 +20,14 @@ type ClientChan chan string
// Get a reference to the EventBus singleton Start procnteessing requests
// this should be a singleton, to ensure that we're always broadcasting to the same clients
// see: https://refactoring.guru/design-patterns/singleton/go/example
func GetEventBusServer() *EventBus {
func GetEventBusServer(logger logrus.FieldLogger) *EventBus {
if singletonEventBusInstance == nil {
eventBusLock.Lock()
defer eventBusLock.Unlock()
if singletonEventBusInstance == nil {
fmt.Println("Creating single instance now.")
singletonEventBusInstance = &EventBus{
Logger: logger,
Message: make(chan EventBusMessage),
NewListener: make(chan EventBusListener),
ClosedListener: make(chan EventBusListener),
@ -45,6 +49,8 @@ func GetEventBusServer() *EventBus {
// It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type EventBus struct {
Logger logrus.FieldLogger
// Events are pushed to this channel by the main events-gathering routine
Message chan EventBusMessage
@ -114,3 +120,16 @@ func (bus *EventBus) listen() {
}
}
}
func (bus *EventBus) PublishMessage(eventMsg models.EventInterface) error {
bus.Logger.Infof("Publishing message to room: `%s`", eventMsg.GetUserID())
payload, err := json.Marshal(eventMsg)
if err != nil {
return err
}
bus.Message <- EventBusMessage{
UserID: eventMsg.GetUserID(),
Message: string(payload),
}
return nil
}

View File

@ -0,0 +1,21 @@
package models
type EventSourceSyncStatus string
const (
EventTypeSourceSync EventSourceSyncStatus = "source_sync"
EventTypeSourceComplete EventSourceSyncStatus = "source_complete"
)
type EventInterface interface {
GetUserID() string
}
type Event struct {
UserID string `json:"-"`
EventType EventSourceSyncStatus `json:"event_type"`
}
func (e *Event) GetUserID() string {
return e.UserID
}

View File

@ -0,0 +1,16 @@
package models
type EventSourceComplete struct {
*Event `json:",inline"`
SourceID string `json:"source_id"`
}
func NewEventSourceComplete(userID string, sourceID string) *EventSourceComplete {
return &EventSourceComplete{
Event: &Event{
UserID: userID,
EventType: EventTypeSourceComplete,
},
SourceID: sourceID,
}
}

View File

@ -0,0 +1,20 @@
package models
type EventSourceSync struct {
*Event `json:",inline"`
SourceID string `json:"source_id"`
ResourceType string `json:"resource_type"`
ResourceID string `json:"resource_id"`
}
func NewEventSourceSync(userID string, sourceID string, resourceType string, resourceID string) *EventSourceSync {
return &EventSourceSync{
Event: &Event{
UserID: userID,
EventType: EventTypeSourceSync,
},
SourceID: sourceID,
ResourceType: resourceType,
ResourceID: resourceID,
}
}

View File

@ -2,7 +2,7 @@ package handler
import (
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"io"
"log"
@ -23,7 +23,7 @@ func SSEStream(c *gin.Context) {
log.Printf("could not get client channel from context")
return
}
listener, ok := v.(sse.EventBusListener)
listener, ok := v.(event_bus.EventBusListener)
if !ok {
return
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/jwk"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/fastenhealth/fasten-sources/clients/factory"
@ -143,7 +144,7 @@ func CreateSource(c *gin.Context) {
// after creating the source, we should do a bulk import (in the background)
summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, &sourceCred)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, &sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
@ -166,11 +167,24 @@ func SourceSync(c *gin.Context) {
}
// after creating the source, we should do a bulk import (in the background)
summary, err := SyncSourceResources(context.WithValue(c.Request.Context(), pkg.ContextKeyTypeAuthUsername, c.Value(pkg.ContextKeyTypeAuthUsername).(string)), logger, databaseRepo, sourceCred)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
}
//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)
err = event_bus.GetEventBusServer(logger).PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
sourceCred.ID.String(),
),
)
if err != nil {
logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err)
}
c.JSON(http.StatusOK, gin.H{"success": true, "source": sourceCred, "data": summary})
}
@ -243,7 +257,21 @@ func CreateManualSource(c *gin.Context) {
return
}
//publish event
currentUser, _ := databaseRepo.GetCurrentUser(c)
err = event_bus.GetEventBusServer(logger).PublishMessage(
models.NewEventSourceComplete(
currentUser.ID.String(),
manualSourceCredential.ID.String(),
),
)
if err != nil {
logger.Warnf("ignoring: an error occurred while publishing sync complete event: %v", err)
}
c.JSON(http.StatusOK, gin.H{"success": true, "data": summary, "source": manualSourceCredential})
}
func GetSource(c *gin.Context) {
@ -314,3 +342,9 @@ func SyncSourceResources(c context.Context, logger *logrus.Entry, databaseRepo d
return summary, nil
}
//
func GetBackgroundContext(ginContext *gin.Context) context.Context {
return context.WithValue(ginContext.Request.Context(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string))
}

View File

@ -4,8 +4,9 @@ import (
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
func SSEHeaderMiddleware() gin.HandlerFunc {
@ -18,10 +19,10 @@ func SSEHeaderMiddleware() gin.HandlerFunc {
}
}
func SSEEventBusServerMiddleware() gin.HandlerFunc {
func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc {
// get reference to streaming server singleton
bus := sse.GetEventBusServer()
bus := event_bus.GetEventBusServer(logger)
return func(c *gin.Context) {
//get a reference to the current user
@ -34,7 +35,7 @@ func SSEEventBusServerMiddleware() gin.HandlerFunc {
}
// Initialize client channel
clientListener := sse.EventBusListener{
clientListener := event_bus.EventBusListener{
ResponseChan: make(chan string),
UserID: foundUser.ID.String(),
}

View File

@ -4,9 +4,9 @@ import (
"embed"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/config"
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/handler"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"net/http"
@ -38,8 +38,8 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
// check if the /web folder is populated.
// check if access to database
bus := sse.GetEventBusServer()
bus.Message <- sse.EventBusMessage{
bus := event_bus.GetEventBusServer(ae.Logger)
bus.Message <- event_bus.EventBusMessage{
UserID: "heartbeat",
Message: "sse heartbeat",
}
@ -79,7 +79,11 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
secure.POST("/query", handler.QueryResourceFhir)
//server-side-events handler
secure.GET("/events/stream", middleware.SSEHeaderMiddleware(), middleware.SSEEventBusServerMiddleware(), handler.SSEStream)
secure.GET("/events/stream",
middleware.SSEHeaderMiddleware(),
middleware.SSEEventBusServerMiddleware(ae.Logger),
handler.SSEStream,
)
}
if ae.Config.GetBool("web.allow_unsafe_endpoints") {