Files
event-driven-shoppinglist/event_store_simple.go
2025-09-29 08:14:53 +02:00

334 lines
8.0 KiB
Go

package main
import (
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase"
)
// SimpleEventStore uses direct SQL queries for better compatibility
type SimpleEventStore struct {
app *pocketbase.PocketBase
}
func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
return &SimpleEventStore{app: app}
}
// GetLatestEvent returns the latest event from the event log
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
rows, err := es.app.DB().
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
From("events").
OrderBy("seq DESC").
Limit(1).
Rows()
if err != nil {
return nil, fmt.Errorf("failed to query latest event: %w", err)
}
defer rows.Close()
if !rows.Next() {
return nil, nil // No events found
}
var event Event
var dataStr string
err = rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to scan latest event: %w", err)
}
// Parse patches from data field
if dataStr != "" {
var patches []PatchOperation
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
}
event.Patches = patches
}
return &event, nil
}
// ProcessEvent processes an incoming event and applies it to the store
func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
// Get latest event for sequence number and hash chaining
latestEvent, err := es.GetLatestEvent()
if err != nil {
return nil, fmt.Errorf("failed to get latest event: %w", err)
}
// Prepare the event
event := &Event{
ItemID: incomingEvent.ItemID,
Collection: incomingEvent.Collection,
Patches: incomingEvent.Patches,
EventID: uuid.New().String(),
Timestamp: time.Now(),
}
// Set sequence number
if latestEvent == nil {
event.Seq = 1
} else {
event.Seq = latestEvent.Seq + 1
}
// Calculate hash
prevHash := ""
if latestEvent != nil {
prevHash = latestEvent.Hash
}
event.Hash = event.calculateHash(prevHash)
// Save event to event log
if err := es.saveEvent(event); err != nil {
return nil, fmt.Errorf("failed to save event: %w", err)
}
// Apply the event to cached data
if err := es.applyEvent(event); err != nil {
return nil, fmt.Errorf("failed to apply event: %w", err)
}
return event, nil
}
// saveEvent saves an event to the events table
func (es *SimpleEventStore) saveEvent(event *Event) error {
// Convert patches to JSON string
patchesBytes, err := json.Marshal(event.Patches)
if err != nil {
return fmt.Errorf("failed to marshal event patches: %w", err)
}
_, err = es.app.DB().
Insert("events", map[string]any{
"id": uuid.New().String(),
"seq": event.Seq,
"hash": event.Hash,
"item_id": event.ItemID,
"event_id": event.EventID,
"collection": event.Collection,
"data": string(patchesBytes), // Store patches in data field
"timestamp": event.Timestamp,
}).Execute()
return err
}
// applyEvent applies an event to the cached data using JSON Patch operations
func (es *SimpleEventStore) applyEvent(event *Event) error {
// Get current document state
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
if err != nil {
// If document doesn't exist, create empty one for patches to work on
currentDoc = map[string]interface{}{
"id": event.ItemID,
"created_at": event.Timestamp,
"updated_at": event.Timestamp,
}
}
// Apply JSON Patch operations
patcher := &JSONPatcher{}
updatedDoc, err := patcher.ApplyPatches(currentDoc, event.Patches)
if err != nil {
return fmt.Errorf("failed to apply patches: %w", err)
}
// Always update the updated_at timestamp
updatedDoc["updated_at"] = event.Timestamp
// Save the updated document
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
}
// getCurrentDocument retrieves the current state of a document
func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) {
var result map[string]interface{}
rows, err := es.app.DB().
Select("*").
From(collection).
Where(dbx.HashExp{"id": itemID}).
Limit(1).
Rows()
if err != nil {
return nil, err
}
defer rows.Close()
if !rows.Next() {
return nil, fmt.Errorf("document not found")
}
// Get column names
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// Scan values
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// Create map
result = make(map[string]interface{})
for i, col := range columns {
result[col] = values[i]
}
return result, nil
}
// saveDocument saves a document to the database
func (es *SimpleEventStore) saveDocument(collection, itemID string, doc map[string]interface{}) error {
// Check if document exists
exists := false
_, err := es.getCurrentDocument(collection, itemID)
if err == nil {
exists = true
}
if exists {
// Update existing document
_, err = es.app.DB().
Update(collection, doc, dbx.HashExp{"id": itemID}).
Execute()
} else {
// Insert new document
_, err = es.app.DB().
Insert(collection, doc).
Execute()
}
return err
}
// GetEventsSince returns events since the given sequence number
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
rows, err := es.app.DB().
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
From("events").
Where(dbx.NewExp("seq > {:seq}", map[string]any{"seq": seq})).
OrderBy("seq ASC").
Rows()
if err != nil {
return nil, fmt.Errorf("failed to fetch events: %w", err)
}
defer rows.Close()
var events []Event
for rows.Next() {
var event Event
var dataStr string
err := rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to scan event: %w", err)
}
// Parse patches from data field
if dataStr != "" {
var patches []PatchOperation
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
}
event.Patches = patches
}
events = append(events, event)
}
return events, nil
}
// GetAllItems returns all non-deleted items from a collection
func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
var items []map[string]interface{}
rows, err := es.app.DB().
Select("*").
From(collection).
Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")).
OrderBy("created_at DESC").
Rows()
if err != nil {
return nil, fmt.Errorf("failed to fetch items: %w", err)
}
defer rows.Close()
// Get column names
columns, err := rows.Columns()
if err != nil {
return nil, err
}
for rows.Next() {
// Create slice to hold values
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
// Scan into values
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// Create map from columns and values
item := make(map[string]interface{})
for i, col := range columns {
item[col] = values[i]
}
items = append(items, item)
}
return items, nil
}
// ValidateSync checks if client sync state matches server state
func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) {
if clientSeq == 0 {
return false, nil // Full sync needed
}
var hash string
err := es.app.DB().
Select("hash").
From("events").
Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})).
Limit(1).
One(&hash)
if err != nil {
if err == sql.ErrNoRows {
return false, nil // Event not found, full sync needed
}
return false, fmt.Errorf("failed to validate sync: %w", err)
}
return hash == clientHash, nil
}