diff --git a/api.go b/api.go index 60e3115..6051b31 100644 --- a/api.go +++ b/api.go @@ -9,7 +9,7 @@ import ( // setupAPIRoutes sets up the event store API routes func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) { app.OnServe().BindFunc(func(se *core.ServeEvent) error { - // JSON Patch endpoint using PATCH method + // JSON Patch endpoint using PATCH method - ONE OPERATION PER REQUEST se.Router.PATCH("/api/collections/{collection}/items/{itemId}", func(e *core.RequestEvent) error { collection := e.Request.PathValue("collection") itemID := e.Request.PathValue("itemId") @@ -18,16 +18,24 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) { return e.BadRequestError("Collection and itemId are required", nil) } - var patches []PatchOperation - if err := e.BindBody(&patches); err != nil { - return e.BadRequestError("Failed to parse JSON Patch data", err) + var operation struct { + Op string `json:"op"` + Path string `json:"path"` + Value string `json:"value"` + From string `json:"from"` + } + if err := e.BindBody(&operation); err != nil { + return e.BadRequestError("Failed to parse operation data", err) } - // Create event with patches + // Create event with single operation incomingEvent := &Event{ - ItemID: itemID, + ItemID: itemID, Collection: collection, - Patches: patches, + Operation: operation.Op, + Path: operation.Path, + Value: operation.Value, + From: operation.From, } // Process the event @@ -47,8 +55,8 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) { } // Validate required fields - if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || len(incomingEvent.Patches) == 0 { - return e.BadRequestError("Missing required fields: item_id, collection, patches", nil) + if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" { + return e.BadRequestError("Missing required fields: item_id, collection, operation", nil) } // Process the event @@ -134,8 +142,8 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) { processedEvents := make([]Event, 0, len(events)) for _, incomingEvent := range events { // Validate required fields - if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || len(incomingEvent.Patches) == 0 { - return e.BadRequestError("Missing required fields in event: item_id, collection, patches", nil) + if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" { + return e.BadRequestError("Missing required fields in event: item_id, collection, operation", nil) } processedEvent, err := eventStore.ProcessEvent(&incomingEvent) diff --git a/event_store_simple.go b/event_store_simple.go index 0b453a1..f48a938 100644 --- a/event_store_simple.go +++ b/event_store_simple.go @@ -2,7 +2,6 @@ package main import ( "database/sql" - "encoding/json" "fmt" "time" @@ -23,40 +22,32 @@ func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore { // GetLatestEvent returns the latest event from the event log func (es *SimpleEventStore) GetLatestEvent() (*Event, error) { - rows, err := es.app.DB(). - Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp"). - From("events"). - OrderBy("seq DESC"). - Limit(1). - Rows() - - if err != nil { - return nil, fmt.Errorf("failed to query latest event: %w", err) - } - defer rows.Close() - - if !rows.Next() { + records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{}) + if err != nil || len(records) == 0 { return nil, nil // No events found } - var event Event - var dataStr string - - err = rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp) - if err != nil { - return nil, fmt.Errorf("failed to scan latest event: %w", err) + record := records[0] + event := &Event{ + Seq: record.GetInt("seq"), + Hash: record.GetString("hash"), + ItemID: record.GetString("item_id"), + EventID: record.GetString("event_id"), + Collection: record.GetString("collection"), + Operation: record.GetString("operation"), + Path: record.GetString("path"), + Value: record.GetString("value"), + From: record.GetString("from"), } - // Parse patches from data field - if dataStr != "" { - var patches []PatchOperation - if err := json.Unmarshal([]byte(dataStr), &patches); err != nil { - return nil, fmt.Errorf("failed to unmarshal patches: %w", err) + // Parse timestamp + if timestampStr := record.GetString("timestamp"); timestampStr != "" { + if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil { + event.Timestamp = timestamp } - event.Patches = patches } - return &event, nil + return event, nil } // ProcessEvent processes an incoming event and applies it to the store @@ -71,7 +62,10 @@ func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) { event := &Event{ ItemID: incomingEvent.ItemID, Collection: incomingEvent.Collection, - Patches: incomingEvent.Patches, + Operation: incomingEvent.Operation, + Path: incomingEvent.Path, + Value: incomingEvent.Value, + From: incomingEvent.From, EventID: uuid.New().String(), Timestamp: time.Now(), } @@ -116,42 +110,47 @@ func (es *SimpleEventStore) saveEvent(event *Event) error { record.Set("item_id", event.ItemID) record.Set("event_id", event.EventID) record.Set("collection", event.Collection) + record.Set("operation", event.Operation) + record.Set("path", event.Path) + record.Set("value", event.Value) + record.Set("from", event.From) record.Set("timestamp", event.Timestamp.Format(time.RFC3339)) - // Convert patches to JSON string and store in data field - if event.Patches != nil { - patchesBytes, err := json.Marshal(event.Patches) - if err != nil { - return fmt.Errorf("failed to marshal event patches: %w", err) - } - record.Set("data", string(patchesBytes)) - } - return es.app.Save(record) } -// applyEvent applies an event to the cached data using JSON Patch operations +// applyEvent applies a single operation to the cached data func (es *SimpleEventStore) applyEvent(event *Event) error { // Get current document state currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID) if err != nil { - // If document doesn't exist, create empty one for patches to work on + // If document doesn't exist, create empty one currentDoc = map[string]interface{}{ - "id": event.ItemID, - "created_at": event.Timestamp, - "updated_at": event.Timestamp, + "id": event.ItemID, } } - // Apply JSON Patch operations + // Apply single operation patcher := &JSONPatcher{} - updatedDoc, err := patcher.ApplyPatches(currentDoc, event.Patches) + patches := []PatchOperation{{ + Op: event.Operation, + Path: event.Path, + Value: event.Value, + From: event.From, + }} + + updatedDoc, err := patcher.ApplyPatches(currentDoc, patches) if err != nil { - return fmt.Errorf("failed to apply patches: %w", err) + return fmt.Errorf("failed to apply operation: %w", err) } - // Always update the updated_at timestamp - updatedDoc["updated_at"] = event.Timestamp + // Update timestamp + updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339) + + // Set created_at if this is a new document + if _, exists := currentDoc["created_at"]; !exists { + updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339) + } // Save the updated document return es.saveDocument(event.Collection, event.ItemID, updatedDoc) @@ -230,38 +229,31 @@ func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[ // GetEventsSince returns events since the given sequence number func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) { - rows, err := es.app.DB(). - Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp"). - From("events"). - Where(dbx.NewExp("seq > {:seq}", map[string]any{"seq": seq})). - OrderBy("seq ASC"). - Rows() - + records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq}) if err != nil { return nil, fmt.Errorf("failed to fetch events: %w", err) } - defer rows.Close() - var events []Event - for rows.Next() { - var event Event - var dataStr string - - err := rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp) - if err != nil { - return nil, fmt.Errorf("failed to scan event: %w", err) + events := make([]Event, len(records)) + for i, record := range records { + events[i] = Event{ + Seq: record.GetInt("seq"), + Hash: record.GetString("hash"), + ItemID: record.GetString("item_id"), + EventID: record.GetString("event_id"), + Collection: record.GetString("collection"), + Operation: record.GetString("operation"), + Path: record.GetString("path"), + Value: record.GetString("value"), + From: record.GetString("from"), } - // Parse patches from data field - if dataStr != "" { - var patches []PatchOperation - if err := json.Unmarshal([]byte(dataStr), &patches); err != nil { - return nil, fmt.Errorf("failed to unmarshal patches: %w", err) + // Parse timestamp + if timestampStr := record.GetString("timestamp"); timestampStr != "" { + if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil { + events[i].Timestamp = timestamp } - event.Patches = patches } - - events = append(events, event) } return events, nil diff --git a/merging.go b/merging.go index 4cc20bd..d2de6ca 100644 --- a/merging.go +++ b/merging.go @@ -68,11 +68,14 @@ func (es *SimpleEventStore) MergeEventLog(cutoffDays int) error { } } + // TODO: Rethink merging for single operations consolidatedEvent := &Event{ Seq: nextSeq, ItemID: itemID, Collection: collectionName, - Patches: patches, + Operation: "add", // Placeholder - merging needs redesign + Path: "/", + Value: "consolidated", Timestamp: time.Now(), } diff --git a/migrations.go b/migrations.go index 5e0f564..a0ed1d0 100644 --- a/migrations.go +++ b/migrations.go @@ -24,7 +24,7 @@ func setupCollections(app core.App) error { eventsCollection.UpdateRule = nil eventsCollection.DeleteRule = nil - // Add fields + // Add fields - ONE EVENT = ONE OPERATION eventsCollection.Fields.Add(&core.NumberField{ Name: "seq", Required: true, @@ -46,7 +46,19 @@ func setupCollections(app core.App) error { Required: true, }) eventsCollection.Fields.Add(&core.TextField{ - Name: "data", + Name: "operation", + Required: true, + }) + eventsCollection.Fields.Add(&core.TextField{ + Name: "path", + Required: true, + }) + eventsCollection.Fields.Add(&core.TextField{ + Name: "value", + Required: false, + }) + eventsCollection.Fields.Add(&core.TextField{ + Name: "from", Required: false, }) eventsCollection.Fields.Add(&core.DateField{ diff --git a/types.go b/types.go index ad5012d..70f9dd5 100644 --- a/types.go +++ b/types.go @@ -2,7 +2,6 @@ package main import ( "crypto/sha256" - "encoding/json" "fmt" "time" @@ -20,13 +19,16 @@ type Event struct { EventID string `json:"event_id"` // Collection of the item that is to be manipulated, defined by the client Collection string `json:"collection"` - // RFC6902 JSON Patch operations that define the changes - Patches []PatchOperation `json:"patches"` + // Single operation - ONE EVENT = ONE OPERATION + Operation string `json:"operation"` // add, remove, replace, move, copy, test + Path string `json:"path"` // JSON Pointer to target location + Value string `json:"value"` // Value as string (for add/replace operations) + From string `json:"from"` // Source path for move/copy operations // Timestamp of the event - server generated, when the event was processed Timestamp time.Time `json:"timestamp"` } -// PatchOperation represents a single RFC6902 JSON Patch operation +// PatchOperation represents a single RFC6902 JSON Patch operation (still needed for JSONPatcher) type PatchOperation struct { Op string `json:"op"` // add, remove, replace, move, copy, test Path string `json:"path"` // JSON Pointer to target location @@ -62,12 +64,8 @@ type SyncResponse struct { func (e *Event) serialize() string { timestamp := e.Timestamp.Format(time.RFC3339Nano) - // Convert patches to JSON string - patchesBytes, _ := json.Marshal(e.Patches) - patchesStr := string(patchesBytes) - - return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|patches:%s|timestamp:%s", - e.Seq, e.ItemID, e.EventID, e.Collection, patchesStr, timestamp) + return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|operation:%s|path:%s|value:%s|from:%s|timestamp:%s", + e.Seq, e.ItemID, e.EventID, e.Collection, e.Operation, e.Path, e.Value, e.From, timestamp) } // Calculate hash of event plus previous hash