package main import ( "database/sql" "fmt" "time" "github.com/google/uuid" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/core" ) // SimpleEventStore uses direct SQL queries for better compatibility type SimpleEventStore struct { app *pocketbase.PocketBase } func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore { return &SimpleEventStore{app: app} } // GetLatestEvent returns the latest event from the event log func (es *SimpleEventStore) GetLatestEvent() (*Event, error) { records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{}) if err != nil || len(records) == 0 { return nil, nil // No events found } record := records[0] event := &Event{ Seq: record.GetInt("seq"), Hash: record.GetString("hash"), ItemID: record.GetString("item_id"), EventID: record.GetString("event_id"), Collection: record.GetString("collection"), Operation: record.GetString("operation"), Path: record.GetString("path"), Value: record.GetString("value"), From: record.GetString("from"), } // Parse timestamp if timestampStr := record.GetString("timestamp"); timestampStr != "" { if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil { event.Timestamp = timestamp } } return event, nil } // ProcessEvent processes an incoming event and applies it to the store func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) { // Get latest event for sequence number and hash chaining latestEvent, err := es.GetLatestEvent() if err != nil { return nil, fmt.Errorf("failed to get latest event: %w", err) } // Prepare the event event := &Event{ ItemID: incomingEvent.ItemID, Collection: incomingEvent.Collection, Operation: incomingEvent.Operation, Path: incomingEvent.Path, Value: incomingEvent.Value, From: incomingEvent.From, EventID: uuid.New().String(), Timestamp: time.Now(), } // Set sequence number if latestEvent == nil { event.Seq = 1 } else { event.Seq = latestEvent.Seq + 1 } // Calculate hash prevHash := "" if latestEvent != nil { prevHash = latestEvent.Hash } event.Hash = event.calculateHash(prevHash) // Save event to event log if err := es.saveEvent(event); err != nil { return nil, fmt.Errorf("failed to save event: %w", err) } // Apply the event to cached data if err := es.applyEvent(event); err != nil { return nil, fmt.Errorf("failed to apply event: %w", err) } return event, nil } // saveEvent saves an event to the events collection func (es *SimpleEventStore) saveEvent(event *Event) error { collection, err := es.app.FindCollectionByNameOrId("events") if err != nil { return fmt.Errorf("events collection not found: %w", err) } record := core.NewRecord(collection) record.Set("seq", event.Seq) record.Set("hash", event.Hash) record.Set("item_id", event.ItemID) record.Set("event_id", event.EventID) record.Set("collection", event.Collection) record.Set("operation", event.Operation) record.Set("path", event.Path) record.Set("value", event.Value) record.Set("from", event.From) record.Set("timestamp", event.Timestamp.Format(time.RFC3339)) return es.app.Save(record) } // applyEvent applies a single operation to the cached data func (es *SimpleEventStore) applyEvent(event *Event) error { // Get current document state currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID) if err != nil { // If document doesn't exist, create empty one currentDoc = map[string]interface{}{ "id": event.ItemID, } } // Apply single operation patcher := &JSONPatcher{} patches := []PatchOperation{{ Op: event.Operation, Path: event.Path, Value: event.Value, From: event.From, }} updatedDoc, err := patcher.ApplyPatches(currentDoc, patches) if err != nil { return fmt.Errorf("failed to apply operation: %w", err) } // Update timestamp updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339) // Set created_at if this is a new document if _, exists := currentDoc["created_at"]; !exists { updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339) } // Save the updated document return es.saveDocument(event.Collection, event.ItemID, updatedDoc) } // getCurrentDocument retrieves the current state of a document func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) { var result map[string]interface{} rows, err := es.app.DB(). Select("*"). From(collection). Where(dbx.HashExp{"id": itemID}). Limit(1). Rows() if err != nil { return nil, err } defer rows.Close() if !rows.Next() { return nil, fmt.Errorf("document not found") } // Get column names columns, err := rows.Columns() if err != nil { return nil, err } // Scan values values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for i := range values { valuePtrs[i] = &values[i] } if err := rows.Scan(valuePtrs...); err != nil { return nil, err } // Create map result = make(map[string]interface{}) for i, col := range columns { result[col] = values[i] } return result, nil } // saveDocument saves a document to the collection func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[string]interface{}) error { collection, err := es.app.FindCollectionByNameOrId(collectionName) if err != nil { return fmt.Errorf("collection %s not found: %w", collectionName, err) } // Try to find existing record record, err := es.app.FindFirstRecordByFilter(collectionName, "id = {:id}", map[string]any{"id": itemID}) if err != nil { // Record doesn't exist, create new one record = core.NewRecord(collection) record.Set("id", itemID) } // Set all fields from doc for key, value := range doc { if key != "id" { // Don't overwrite the ID record.Set(key, value) } } return es.app.Save(record) } // GetEventsSince returns events since the given sequence number func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) { records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq}) if err != nil { return nil, fmt.Errorf("failed to fetch events: %w", err) } events := make([]Event, len(records)) for i, record := range records { events[i] = Event{ Seq: record.GetInt("seq"), Hash: record.GetString("hash"), ItemID: record.GetString("item_id"), EventID: record.GetString("event_id"), Collection: record.GetString("collection"), Operation: record.GetString("operation"), Path: record.GetString("path"), Value: record.GetString("value"), From: record.GetString("from"), } // Parse timestamp if timestampStr := record.GetString("timestamp"); timestampStr != "" { if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil { events[i].Timestamp = timestamp } } } return events, nil } // GetAllItems returns all non-deleted items from a collection func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) { var items []map[string]interface{} rows, err := es.app.DB(). Select("*"). From(collection). Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")). OrderBy("created_at DESC"). Rows() if err != nil { return nil, fmt.Errorf("failed to fetch items: %w", err) } defer rows.Close() // Get column names columns, err := rows.Columns() if err != nil { return nil, err } for rows.Next() { // Create slice to hold values values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for i := range values { valuePtrs[i] = &values[i] } // Scan into values if err := rows.Scan(valuePtrs...); err != nil { return nil, err } // Create map from columns and values item := make(map[string]interface{}) for i, col := range columns { item[col] = values[i] } items = append(items, item) } return items, nil } // ValidateSync checks if client sync state matches server state func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) { if clientSeq == 0 { return false, nil // Full sync needed } var hash string err := es.app.DB(). Select("hash"). From("events"). Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})). Limit(1). One(&hash) if err != nil { if err == sql.ErrNoRows { return false, nil // Event not found, full sync needed } return false, fmt.Errorf("failed to validate sync: %w", err) } return hash == clientHash, nil }