diff --git a/.gitignore b/.gitignore index e2769343..28121a83 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,5 @@ test.go config.dev.yaml .cache/ +cache/ fasten-*.db diff --git a/backend/pkg/constants.go b/backend/pkg/constants.go index b02e6860..faaaef3f 100644 --- a/backend/pkg/constants.go +++ b/backend/pkg/constants.go @@ -9,8 +9,8 @@ const ( ContextKeyTypeDatabase string = "REPOSITORY" ContextKeyTypeLogger string = "LOGGER" - ContextKeyTypeSSEServer string = "SSE_SERVER" - ContextKeyTypeSSEClientChannel string = "SSE_CLIENT_CHANNEL" + ContextKeyTypeSSEEventBusServer string = "SSE_EVENT_BUS_SERVER" + ContextKeyTypeSSEClientChannel string = "SSE_CLIENT_CHANNEL" ContextKeyTypeAuthUsername string = "AUTH_USERNAME" ContextKeyTypeAuthToken string = "AUTH_TOKEN" diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go index e3289cc4..76a71903 100644 --- a/backend/pkg/web/handler/server_sent_event.go +++ b/backend/pkg/web/handler/server_sent_event.go @@ -2,7 +2,7 @@ package handler import ( "github.com/fastenhealth/fasten-onprem/backend/pkg" - "github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware" + "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" "github.com/gin-gonic/gin" "io" ) @@ -21,7 +21,7 @@ func SSEStream(c *gin.Context) { if !ok { return } - clientChan, ok := v.(middleware.ClientChan) + clientChan, ok := v.(sse.ClientChan) if !ok { return } diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go index 9f0f545a..fc427d3c 100644 --- a/backend/pkg/web/middleware/server_sent_event.go +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -3,8 +3,8 @@ package middleware import ( "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" "github.com/gin-gonic/gin" - "log" "time" ) @@ -18,10 +18,10 @@ func SSEHeaderMiddleware() gin.HandlerFunc { } } -func SSEServerMiddleware() gin.HandlerFunc { +func SSEEventBusServerMiddleware() gin.HandlerFunc { - // Initialize new streaming server - stream := NewSSEServer() + // get reference to streaming server singleton + bus := sse.GetEventBusServer() ///TODO: testing only go func() { @@ -31,90 +31,25 @@ func SSEServerMiddleware() gin.HandlerFunc { currentTime := fmt.Sprintf("The Current Time Is %v", now) // Send current time to clients message channel - stream.Message <- currentTime + bus.Message <- currentTime } }() return func(c *gin.Context) { // Initialize client channel - clientChan := make(ClientChan) + clientChan := make(sse.ClientChan) // Send new connection to event server - stream.NewClients <- clientChan + bus.NewClients <- clientChan defer func() { // Send closed connection to event server - stream.ClosedClients <- clientChan + bus.ClosedClients <- clientChan }() - c.Set(pkg.ContextKeyTypeSSEServer, stream) + c.Set(pkg.ContextKeyTypeSSEEventBusServer, bus) c.Set(pkg.ContextKeyTypeSSEClientChannel, clientChan) c.Next() } } - -//#################################################################################################### -//TODO: this should all be moved into its own package -//#################################################################################################### - -// New event messages are broadcast to all registered client connection channels -// TODO: change this to be use specific channels. -type ClientChan chan string - -// Initialize event and Start procnteessing requests -// this should be a singleton, to ensure that we're always broadcasting to the same clients -// see: https://refactoring.guru/design-patterns/singleton/go/example -func NewSSEServer() (event *SSEvent) { - event = &SSEvent{ - Message: make(chan string), - NewClients: make(chan chan string), - ClosedClients: make(chan chan string), - TotalClients: make(map[chan string]bool), - } - - go event.listen() - - return -} - -// It keeps a list of clients those are currently attached -// and broadcasting events to those clients. -type SSEvent struct { - // Events are pushed to this channel by the main events-gathering routine - Message chan string - - // New client connections - NewClients chan chan string - - // Closed client connections - ClosedClients chan chan string - - // Total client connections - TotalClients map[chan string]bool -} - -// It Listens all incoming requests from clients. -// Handles addition and removal of clients and broadcast messages to clients. -func (stream *SSEvent) listen() { - for { - select { - // Add new available client - case client := <-stream.NewClients: - stream.TotalClients[client] = true - log.Printf("Client added. %d registered clients", len(stream.TotalClients)) - - // Remove closed client - case client := <-stream.ClosedClients: - delete(stream.TotalClients, client) - close(client) - log.Printf("Removed client. %d registered clients", len(stream.TotalClients)) - - // Broadcast message to client - case eventMsg := <-stream.Message: - for clientMessageChan := range stream.TotalClients { - clientMessageChan <- eventMsg - } - } - } -} diff --git a/backend/pkg/web/server.go b/backend/pkg/web/server.go index c5834e7f..776afd85 100644 --- a/backend/pkg/web/server.go +++ b/backend/pkg/web/server.go @@ -71,7 +71,7 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { secure.POST("/query", handler.QueryResourceFhir) //server-side-events handler - secure.GET("/sse/stream", middleware.SSEHeaderMiddleware(), middleware.SSEServerMiddleware(), handler.SSEStream) + secure.GET("/sse/stream", middleware.SSEHeaderMiddleware(), middleware.SSEEventBusServerMiddleware(), handler.SSEStream) } if ae.Config.GetBool("web.allow_unsafe_endpoints") { diff --git a/backend/pkg/web/sse/event_bus.go b/backend/pkg/web/sse/event_bus.go new file mode 100644 index 00000000..0c60250b --- /dev/null +++ b/backend/pkg/web/sse/event_bus.go @@ -0,0 +1,84 @@ +package sse + +import ( + "fmt" + "log" + "sync" +) + +var eventBusLock = &sync.Mutex{} + +var singletonEventBusInstance *EventBus + +// New event messages are broadcast to all registered client connection channels +// TODO: change this to be use specific channels. +type ClientChan chan string + +// Get a reference to the EventBus singleton Start procnteessing requests +// this should be a singleton, to ensure that we're always broadcasting to the same clients +// see: https://refactoring.guru/design-patterns/singleton/go/example +func GetEventBusServer() (bus *EventBus) { + if singletonEventBusInstance == nil { + eventBusLock.Lock() + defer eventBusLock.Unlock() + if singletonEventBusInstance == nil { + fmt.Println("Creating single instance now.") + singletonEventBusInstance = &EventBus{ + Message: make(chan string), + NewClients: make(chan chan string), + ClosedClients: make(chan chan string), + TotalClients: make(map[chan string]bool), + } + + // Start processing requests + go bus.listen() + } else { + fmt.Println("Single instance already created.") + } + } else { + fmt.Println("Single instance already created.") + } + + return singletonEventBusInstance +} + +// It keeps a list of clients those are currently attached +// and broadcasting events to those clients. +type EventBus struct { + // Events are pushed to this channel by the main events-gathering routine + Message chan string + + // New client connections + NewClients chan chan string + + // Closed client connections + ClosedClients chan chan string + + // Total client connections + TotalClients map[chan string]bool +} + +// It Listens all incoming requests from clients. +// Handles addition and removal of clients and broadcast messages to clients. +func (bus *EventBus) listen() { + for { + select { + // Add new available client + case client := <-bus.NewClients: + bus.TotalClients[client] = true + log.Printf("Client added. %d registered clients", len(bus.TotalClients)) + + // Remove closed client + case client := <-bus.ClosedClients: + delete(bus.TotalClients, client) + close(client) + log.Printf("Removed client. %d registered clients", len(bus.TotalClients)) + + // Broadcast message to client + case eventMsg := <-bus.Message: + for clientMessageChan := range bus.TotalClients { + clientMessageChan <- eventMsg + } + } + } +}