From 6a3afe150e2bca4b1407ff34097c5d4348ccf5fe Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Fri, 8 Sep 2023 11:27:38 -0700 Subject: [PATCH] working private event notifications. --- backend/pkg/database/sqlite_repository.go | 5 +- backend/pkg/web/handler/server_sent_event.go | 6 +- .../pkg/web/middleware/server_sent_event.go | 22 +++++-- backend/pkg/web/server.go | 5 +- backend/pkg/web/sse/event_bus.go | 65 ++++++++++++++----- 5 files changed, 78 insertions(+), 25 deletions(-) diff --git a/backend/pkg/database/sqlite_repository.go b/backend/pkg/database/sqlite_repository.go index cd0bfc71..5cfed3ea 100644 --- a/backend/pkg/database/sqlite_repository.go +++ b/backend/pkg/database/sqlite_repository.go @@ -357,7 +357,10 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) } - sr.EventBus.Message <- fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID) + sr.EventBus.Message <- sse.EventBusMessage{ + Message: fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID), + UserID: currentUser.ID.String(), + } createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{ SourceID: wrappedFhirResourceModel.GetSourceID(), SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go index 76a71903..d3f9ae7b 100644 --- a/backend/pkg/web/handler/server_sent_event.go +++ b/backend/pkg/web/handler/server_sent_event.go @@ -5,6 +5,7 @@ import ( "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" "github.com/gin-gonic/gin" "io" + "log" ) // SSEStream is a handler for the server sent event stream (notifications from background processes) @@ -19,15 +20,16 @@ func SSEStream(c *gin.Context) { //databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) v, ok := c.Get(pkg.ContextKeyTypeSSEClientChannel) if !ok { + log.Printf("could not get client channel from context") return } - clientChan, ok := v.(sse.ClientChan) + listener, ok := v.(sse.EventBusListener) if !ok { return } c.Stream(func(w io.Writer) bool { // Stream message to client from message channel - if msg, ok := <-clientChan; ok { + if msg, ok := <-listener.ResponseChan; ok { c.SSEvent("message", msg) return true } diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go index 9e7811ce..2e594e34 100644 --- a/backend/pkg/web/middleware/server_sent_event.go +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -1,7 +1,9 @@ package middleware import ( + "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" "github.com/gin-gonic/gin" ) @@ -22,19 +24,31 @@ func SSEEventBusServerMiddleware() gin.HandlerFunc { bus := sse.GetEventBusServer() return func(c *gin.Context) { + //get a reference to the current user + databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) + + foundUser, err := databaseRepo.GetCurrentUser(c) + if err != nil || foundUser == nil { + c.Error(fmt.Errorf("could not find user")) + return + } + // Initialize client channel - clientChan := make(sse.ClientChan) + clientListener := sse.EventBusListener{ + ResponseChan: make(chan string), + UserID: foundUser.ID.String(), + } // Send new connection to event server - bus.NewClients <- clientChan + bus.NewListener <- clientListener defer func() { // Send closed connection to event server - bus.ClosedClients <- clientChan + bus.ClosedListener <- clientListener }() c.Set(pkg.ContextKeyTypeSSEEventBusServer, bus) - c.Set(pkg.ContextKeyTypeSSEClientChannel, clientChan) + c.Set(pkg.ContextKeyTypeSSEClientChannel, clientListener) c.Next() } diff --git a/backend/pkg/web/server.go b/backend/pkg/web/server.go index 6013f43e..c41523ab 100644 --- a/backend/pkg/web/server.go +++ b/backend/pkg/web/server.go @@ -39,7 +39,10 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { // check if access to database bus := sse.GetEventBusServer() - bus.Message <- "sse heartbeat" + bus.Message <- sse.EventBusMessage{ + UserID: "heartbeat", + Message: "sse heartbeat", + } c.JSON(http.StatusOK, gin.H{ "success": true, diff --git a/backend/pkg/web/sse/event_bus.go b/backend/pkg/web/sse/event_bus.go index 79f8dfcf..6a582645 100644 --- a/backend/pkg/web/sse/event_bus.go +++ b/backend/pkg/web/sse/event_bus.go @@ -24,10 +24,10 @@ func GetEventBusServer() *EventBus { if singletonEventBusInstance == nil { fmt.Println("Creating single instance now.") singletonEventBusInstance = &EventBus{ - Message: make(chan string), - NewClients: make(chan chan string), - ClosedClients: make(chan chan string), - TotalClients: make(map[chan string]bool), + Message: make(chan EventBusMessage), + NewListener: make(chan EventBusListener), + ClosedListener: make(chan EventBusListener), + TotalRoomListeners: make(map[string][]EventBusListener), } // Start processing requests @@ -46,16 +46,26 @@ func GetEventBusServer() *EventBus { // and broadcasting events to those clients. type EventBus struct { // Events are pushed to this channel by the main events-gathering routine - Message chan string + Message chan EventBusMessage // New client connections - NewClients chan chan string + NewListener chan EventBusListener // Closed client connections - ClosedClients chan chan string + ClosedListener chan EventBusListener // Total client connections - TotalClients map[chan string]bool + TotalRoomListeners map[string][]EventBusListener +} + +type EventBusListener struct { + ResponseChan chan string + UserID string +} + +type EventBusMessage struct { + UserID string + Message string } // It Listens all incoming requests from clients. @@ -65,21 +75,42 @@ func (bus *EventBus) listen() { for { select { // Add new available client - case client := <-bus.NewClients: - bus.TotalClients[client] = true - log.Printf("Client added. %d registered clients", len(bus.TotalClients)) + case listener := <-bus.NewListener: + //check if this userId room already exists, or create it + if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists { + bus.TotalRoomListeners[listener.UserID] = []EventBusListener{} + } + bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID], listener) + log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.TotalRoomListeners[listener.UserID])) // Remove closed client - case client := <-bus.ClosedClients: - delete(bus.TotalClients, client) - close(client) - log.Printf("Removed client. %d registered clients", len(bus.TotalClients)) + case listener := <-bus.ClosedListener: + if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists { + log.Printf("Room `%s` not found", listener.UserID) + continue + } else { + //loop through all the listeners in the room and remove the one that matches + for i, v := range bus.TotalRoomListeners[listener.UserID] { + if v.ResponseChan == listener.ResponseChan { + bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID][:i], bus.TotalRoomListeners[listener.UserID][i+1:]...) + close(listener.ResponseChan) + log.Printf("Removed listener from room: `%s`. %d registered clients", listener.UserID, len(bus.TotalRoomListeners[listener.UserID])) + break + } + } + } // Broadcast message to client case eventMsg := <-bus.Message: - for clientMessageChan := range bus.TotalClients { - clientMessageChan <- eventMsg + if _, exists := bus.TotalRoomListeners[eventMsg.UserID]; !exists { + log.Printf("Room `%s` not found, could not send message: `%s`", eventMsg.UserID, eventMsg.Message) + continue + } else { + for _, roomListener := range bus.TotalRoomListeners[eventMsg.UserID] { + roomListener.ResponseChan <- eventMsg.Message + } } + } } }