Deretardify the fucking events
Good job claude complicate everything why don't you
This commit is contained in:
30
api.go
30
api.go
@@ -9,7 +9,7 @@ import (
|
||||
// setupAPIRoutes sets up the event store API routes
|
||||
func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// JSON Patch endpoint using PATCH method
|
||||
// JSON Patch endpoint using PATCH method - ONE OPERATION PER REQUEST
|
||||
se.Router.PATCH("/api/collections/{collection}/items/{itemId}", func(e *core.RequestEvent) error {
|
||||
collection := e.Request.PathValue("collection")
|
||||
itemID := e.Request.PathValue("itemId")
|
||||
@@ -18,16 +18,24 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
return e.BadRequestError("Collection and itemId are required", nil)
|
||||
}
|
||||
|
||||
var patches []PatchOperation
|
||||
if err := e.BindBody(&patches); err != nil {
|
||||
return e.BadRequestError("Failed to parse JSON Patch data", err)
|
||||
var operation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value string `json:"value"`
|
||||
From string `json:"from"`
|
||||
}
|
||||
if err := e.BindBody(&operation); err != nil {
|
||||
return e.BadRequestError("Failed to parse operation data", err)
|
||||
}
|
||||
|
||||
// Create event with patches
|
||||
// Create event with single operation
|
||||
incomingEvent := &Event{
|
||||
ItemID: itemID,
|
||||
ItemID: itemID,
|
||||
Collection: collection,
|
||||
Patches: patches,
|
||||
Operation: operation.Op,
|
||||
Path: operation.Path,
|
||||
Value: operation.Value,
|
||||
From: operation.From,
|
||||
}
|
||||
|
||||
// Process the event
|
||||
@@ -47,8 +55,8 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
}
|
||||
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || len(incomingEvent.Patches) == 0 {
|
||||
return e.BadRequestError("Missing required fields: item_id, collection, patches", nil)
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
// Process the event
|
||||
@@ -134,8 +142,8 @@ func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
processedEvents := make([]Event, 0, len(events))
|
||||
for _, incomingEvent := range events {
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || len(incomingEvent.Patches) == 0 {
|
||||
return e.BadRequestError("Missing required fields in event: item_id, collection, patches", nil)
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields in event: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
|
@@ -2,7 +2,6 @@ package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -23,40 +22,32 @@ func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
|
||||
|
||||
// GetLatestEvent returns the latest event from the event log
|
||||
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
|
||||
rows, err := es.app.DB().
|
||||
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
|
||||
From("events").
|
||||
OrderBy("seq DESC").
|
||||
Limit(1).
|
||||
Rows()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query latest event: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{})
|
||||
if err != nil || len(records) == 0 {
|
||||
return nil, nil // No events found
|
||||
}
|
||||
|
||||
var event Event
|
||||
var dataStr string
|
||||
|
||||
err = rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan latest event: %w", err)
|
||||
record := records[0]
|
||||
event := &Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse patches from data field
|
||||
if dataStr != "" {
|
||||
var patches []PatchOperation
|
||||
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
event.Timestamp = timestamp
|
||||
}
|
||||
event.Patches = patches
|
||||
}
|
||||
|
||||
return &event, nil
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// ProcessEvent processes an incoming event and applies it to the store
|
||||
@@ -71,7 +62,10 @@ func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
event := &Event{
|
||||
ItemID: incomingEvent.ItemID,
|
||||
Collection: incomingEvent.Collection,
|
||||
Patches: incomingEvent.Patches,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
EventID: uuid.New().String(),
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
@@ -116,42 +110,47 @@ func (es *SimpleEventStore) saveEvent(event *Event) error {
|
||||
record.Set("item_id", event.ItemID)
|
||||
record.Set("event_id", event.EventID)
|
||||
record.Set("collection", event.Collection)
|
||||
record.Set("operation", event.Operation)
|
||||
record.Set("path", event.Path)
|
||||
record.Set("value", event.Value)
|
||||
record.Set("from", event.From)
|
||||
record.Set("timestamp", event.Timestamp.Format(time.RFC3339))
|
||||
|
||||
// Convert patches to JSON string and store in data field
|
||||
if event.Patches != nil {
|
||||
patchesBytes, err := json.Marshal(event.Patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event patches: %w", err)
|
||||
}
|
||||
record.Set("data", string(patchesBytes))
|
||||
}
|
||||
|
||||
return es.app.Save(record)
|
||||
}
|
||||
|
||||
// applyEvent applies an event to the cached data using JSON Patch operations
|
||||
// applyEvent applies a single operation to the cached data
|
||||
func (es *SimpleEventStore) applyEvent(event *Event) error {
|
||||
// Get current document state
|
||||
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
|
||||
if err != nil {
|
||||
// If document doesn't exist, create empty one for patches to work on
|
||||
// If document doesn't exist, create empty one
|
||||
currentDoc = map[string]interface{}{
|
||||
"id": event.ItemID,
|
||||
"created_at": event.Timestamp,
|
||||
"updated_at": event.Timestamp,
|
||||
"id": event.ItemID,
|
||||
}
|
||||
}
|
||||
|
||||
// Apply JSON Patch operations
|
||||
// Apply single operation
|
||||
patcher := &JSONPatcher{}
|
||||
updatedDoc, err := patcher.ApplyPatches(currentDoc, event.Patches)
|
||||
patches := []PatchOperation{{
|
||||
Op: event.Operation,
|
||||
Path: event.Path,
|
||||
Value: event.Value,
|
||||
From: event.From,
|
||||
}}
|
||||
|
||||
updatedDoc, err := patcher.ApplyPatches(currentDoc, patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply patches: %w", err)
|
||||
return fmt.Errorf("failed to apply operation: %w", err)
|
||||
}
|
||||
|
||||
// Always update the updated_at timestamp
|
||||
updatedDoc["updated_at"] = event.Timestamp
|
||||
// Update timestamp
|
||||
updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
|
||||
// Set created_at if this is a new document
|
||||
if _, exists := currentDoc["created_at"]; !exists {
|
||||
updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// Save the updated document
|
||||
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
|
||||
@@ -230,38 +229,31 @@ func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[
|
||||
|
||||
// GetEventsSince returns events since the given sequence number
|
||||
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
rows, err := es.app.DB().
|
||||
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
|
||||
From("events").
|
||||
Where(dbx.NewExp("seq > {:seq}", map[string]any{"seq": seq})).
|
||||
OrderBy("seq ASC").
|
||||
Rows()
|
||||
|
||||
records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var events []Event
|
||||
for rows.Next() {
|
||||
var event Event
|
||||
var dataStr string
|
||||
|
||||
err := rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan event: %w", err)
|
||||
events := make([]Event, len(records))
|
||||
for i, record := range records {
|
||||
events[i] = Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse patches from data field
|
||||
if dataStr != "" {
|
||||
var patches []PatchOperation
|
||||
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
events[i].Timestamp = timestamp
|
||||
}
|
||||
event.Patches = patches
|
||||
}
|
||||
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
|
@@ -68,11 +68,14 @@ func (es *SimpleEventStore) MergeEventLog(cutoffDays int) error {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Rethink merging for single operations
|
||||
consolidatedEvent := &Event{
|
||||
Seq: nextSeq,
|
||||
ItemID: itemID,
|
||||
Collection: collectionName,
|
||||
Patches: patches,
|
||||
Operation: "add", // Placeholder - merging needs redesign
|
||||
Path: "/",
|
||||
Value: "consolidated",
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
|
@@ -24,7 +24,7 @@ func setupCollections(app core.App) error {
|
||||
eventsCollection.UpdateRule = nil
|
||||
eventsCollection.DeleteRule = nil
|
||||
|
||||
// Add fields
|
||||
// Add fields - ONE EVENT = ONE OPERATION
|
||||
eventsCollection.Fields.Add(&core.NumberField{
|
||||
Name: "seq",
|
||||
Required: true,
|
||||
@@ -46,7 +46,19 @@ func setupCollections(app core.App) error {
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "data",
|
||||
Name: "operation",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "path",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "value",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "from",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.DateField{
|
||||
|
18
types.go
18
types.go
@@ -2,7 +2,6 @@ package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -20,13 +19,16 @@ type Event struct {
|
||||
EventID string `json:"event_id"`
|
||||
// Collection of the item that is to be manipulated, defined by the client
|
||||
Collection string `json:"collection"`
|
||||
// RFC6902 JSON Patch operations that define the changes
|
||||
Patches []PatchOperation `json:"patches"`
|
||||
// Single operation - ONE EVENT = ONE OPERATION
|
||||
Operation string `json:"operation"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
Value string `json:"value"` // Value as string (for add/replace operations)
|
||||
From string `json:"from"` // Source path for move/copy operations
|
||||
// Timestamp of the event - server generated, when the event was processed
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// PatchOperation represents a single RFC6902 JSON Patch operation
|
||||
// PatchOperation represents a single RFC6902 JSON Patch operation (still needed for JSONPatcher)
|
||||
type PatchOperation struct {
|
||||
Op string `json:"op"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
@@ -62,12 +64,8 @@ type SyncResponse struct {
|
||||
func (e *Event) serialize() string {
|
||||
timestamp := e.Timestamp.Format(time.RFC3339Nano)
|
||||
|
||||
// Convert patches to JSON string
|
||||
patchesBytes, _ := json.Marshal(e.Patches)
|
||||
patchesStr := string(patchesBytes)
|
||||
|
||||
return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|patches:%s|timestamp:%s",
|
||||
e.Seq, e.ItemID, e.EventID, e.Collection, patchesStr, timestamp)
|
||||
return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|operation:%s|path:%s|value:%s|from:%s|timestamp:%s",
|
||||
e.Seq, e.ItemID, e.EventID, e.Collection, e.Operation, e.Path, e.Value, e.From, timestamp)
|
||||
}
|
||||
|
||||
// Calculate hash of event plus previous hash
|
||||
|
Reference in New Issue
Block a user