Update realtime functionality
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
.idea
|
12
main.go
12
main.go
@@ -45,6 +45,16 @@ func main() {
|
||||
listener.Collections = []string{COLLECTION_NAME}
|
||||
listener.initialize()
|
||||
|
||||
log.Printf("zzz...\n")
|
||||
for {
|
||||
select {
|
||||
case event := <-listener.Create:
|
||||
log.Printf("Create event: %+v\n", event)
|
||||
case event := <-listener.Update:
|
||||
log.Printf("Update event: %+v\n", event)
|
||||
case event := <-listener.Delete:
|
||||
log.Printf("Delete event: %+v\n", event)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Hour)
|
||||
}
|
||||
|
61
realtime.go
61
realtime.go
@@ -12,7 +12,9 @@ import (
|
||||
type RealtimeListener struct {
|
||||
Url string
|
||||
Collections []string
|
||||
Chan chan string
|
||||
Create chan PBEvent
|
||||
Update chan PBEvent
|
||||
Delete chan PBEvent
|
||||
client *sse.Client
|
||||
}
|
||||
|
||||
@@ -21,18 +23,40 @@ type Subscription struct {
|
||||
Subscriptions []string `json:"subscriptions"`
|
||||
}
|
||||
|
||||
func (listener RealtimeListener) handlePbConnect(msg *sse.Event) {
|
||||
var connectEvent ConnectEvent
|
||||
err := json.Unmarshal(msg.Data, &connectEvent)
|
||||
func (listener RealtimeListener) handlePbEvent(msg *sse.Event) {
|
||||
var pbEvent = new(PBEvent)
|
||||
err := json.Unmarshal(msg.Data, &pbEvent)
|
||||
if err != nil {
|
||||
log.Printf("Error unmarshalling PB_CONNECT event: %v\n", err)
|
||||
log.Printf("Error unmarshalling event: %v\n", err)
|
||||
return
|
||||
}
|
||||
log.Printf("Client ID: %v\n", connectEvent.ClientId)
|
||||
|
||||
if pbEvent.ClientId != "" {
|
||||
listener.doSubscribe(pbEvent.ClientId)
|
||||
}
|
||||
|
||||
if pbEvent.Action != "" {
|
||||
go listener.shipEvent(*pbEvent)
|
||||
}
|
||||
}
|
||||
|
||||
func (listener RealtimeListener) shipEvent(event PBEvent) {
|
||||
switch event.Action {
|
||||
case "create":
|
||||
listener.Create <- event
|
||||
case "update":
|
||||
listener.Update <- event
|
||||
case "delete":
|
||||
listener.Delete <- event
|
||||
default:
|
||||
log.Printf("Unknown action: %v\n", event.Action)
|
||||
}
|
||||
}
|
||||
|
||||
func (listener RealtimeListener) doSubscribe(clientId string) {
|
||||
subscription := Subscription{
|
||||
ClientId: connectEvent.ClientId,
|
||||
Subscriptions: []string{"workshop_link"},
|
||||
ClientId: clientId,
|
||||
Subscriptions: listener.Collections,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(subscription)
|
||||
@@ -47,18 +71,17 @@ func (listener RealtimeListener) handlePbConnect(msg *sse.Event) {
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
log.Printf("Subscription request failed with status: %v\n", resp.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func (listener RealtimeListener) handlePbEvent(msg *sse.Event) {
|
||||
log.Println(msg)
|
||||
// listener.Chan <- msg.Data
|
||||
}
|
||||
|
||||
func (listener RealtimeListener) initialize() {
|
||||
func (listener *RealtimeListener) initialize() {
|
||||
listener.Update = make(chan PBEvent, 32)
|
||||
listener.Create = make(chan PBEvent, 32)
|
||||
listener.Delete = make(chan PBEvent, 32)
|
||||
log.Print("initialized")
|
||||
listener.client = sse.NewClient(listener.Url)
|
||||
listener.client.Subscribe("PB_CONNECT", listener.handlePbConnect)
|
||||
|
||||
for _, collection := range listener.Collections {
|
||||
listener.client.Subscribe(collection, listener.handlePbEvent)
|
||||
}
|
||||
go listener.client.Subscribe("", listener.handlePbEvent)
|
||||
}
|
||||
|
Reference in New Issue
Block a user