From db79ff36228236fd5de2ccee25a60fee46a628e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Mon, 17 Jun 2024 00:00:09 +0200 Subject: [PATCH] Update realtime functionality --- .gitignore | 1 + main.go | 12 +++++++++- realtime.go | 63 ++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 55 insertions(+), 21 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/main.go b/main.go index 8ade937..5166956 100644 --- a/main.go +++ b/main.go @@ -45,6 +45,16 @@ func main() { listener.Collections = []string{COLLECTION_NAME} listener.initialize() - log.Printf("zzz...\n") + for { + select { + case event := <-listener.Create: + log.Printf("Create event: %+v\n", event) + case event := <-listener.Update: + log.Printf("Update event: %+v\n", event) + case event := <-listener.Delete: + log.Printf("Delete event: %+v\n", event) + } + } + time.Sleep(1 * time.Hour) } diff --git a/realtime.go b/realtime.go index 5b53e44..32f2571 100644 --- a/realtime.go +++ b/realtime.go @@ -12,7 +12,9 @@ import ( type RealtimeListener struct { Url string Collections []string - Chan chan string + Create chan PBEvent + Update chan PBEvent + Delete chan PBEvent client *sse.Client } @@ -21,18 +23,40 @@ type Subscription struct { Subscriptions []string `json:"subscriptions"` } -func (listener RealtimeListener) handlePbConnect(msg *sse.Event) { - var connectEvent ConnectEvent - err := json.Unmarshal(msg.Data, &connectEvent) +func (listener RealtimeListener) handlePbEvent(msg *sse.Event) { + var pbEvent = new(PBEvent) + err := json.Unmarshal(msg.Data, &pbEvent) if err != nil { - log.Printf("Error unmarshalling PB_CONNECT event: %v\n", err) + log.Printf("Error unmarshalling event: %v\n", err) return } - log.Printf("Client ID: %v\n", connectEvent.ClientId) + if pbEvent.ClientId != "" { + listener.doSubscribe(pbEvent.ClientId) + } + + if pbEvent.Action != "" { + go listener.shipEvent(*pbEvent) + } +} + +func (listener RealtimeListener) shipEvent(event PBEvent) { + switch event.Action { + case "create": + listener.Create <- event + case "update": + listener.Update <- event + case "delete": + listener.Delete <- event + default: + log.Printf("Unknown action: %v\n", event.Action) + } +} + +func (listener RealtimeListener) doSubscribe(clientId string) { subscription := Subscription{ - ClientId: connectEvent.ClientId, - Subscriptions: []string{"workshop_link"}, + ClientId: clientId, + Subscriptions: listener.Collections, } body, err := json.Marshal(subscription) @@ -47,18 +71,17 @@ func (listener RealtimeListener) handlePbConnect(msg *sse.Event) { return } defer resp.Body.Close() -} -func (listener RealtimeListener) handlePbEvent(msg *sse.Event) { - log.Println(msg) - // listener.Chan <- msg.Data -} - -func (listener RealtimeListener) initialize() { - listener.client = sse.NewClient(listener.Url) - listener.client.Subscribe("PB_CONNECT", listener.handlePbConnect) - - for _, collection := range listener.Collections { - listener.client.Subscribe(collection, listener.handlePbEvent) + if resp.StatusCode != http.StatusNoContent { + log.Printf("Subscription request failed with status: %v\n", resp.Status) } } + +func (listener *RealtimeListener) initialize() { + listener.Update = make(chan PBEvent, 32) + listener.Create = make(chan PBEvent, 32) + listener.Delete = make(chan PBEvent, 32) + log.Print("initialized") + listener.client = sse.NewClient(listener.Url) + go listener.client.Subscribe("", listener.handlePbEvent) +}