334 lines
8.0 KiB
Go
334 lines
8.0 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pocketbase/dbx"
|
|
"github.com/pocketbase/pocketbase"
|
|
)
|
|
|
|
// SimpleEventStore uses direct SQL queries for better compatibility
|
|
type SimpleEventStore struct {
|
|
app *pocketbase.PocketBase
|
|
}
|
|
|
|
func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
|
|
return &SimpleEventStore{app: app}
|
|
}
|
|
|
|
// GetLatestEvent returns the latest event from the event log
|
|
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
|
|
rows, err := es.app.DB().
|
|
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
|
|
From("events").
|
|
OrderBy("seq DESC").
|
|
Limit(1).
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query latest event: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
return nil, nil // No events found
|
|
}
|
|
|
|
var event Event
|
|
var dataStr string
|
|
|
|
err = rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan latest event: %w", err)
|
|
}
|
|
|
|
// Parse patches from data field
|
|
if dataStr != "" {
|
|
var patches []PatchOperation
|
|
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
|
|
}
|
|
event.Patches = patches
|
|
}
|
|
|
|
return &event, nil
|
|
}
|
|
|
|
// ProcessEvent processes an incoming event and applies it to the store
|
|
func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
|
// Get latest event for sequence number and hash chaining
|
|
latestEvent, err := es.GetLatestEvent()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get latest event: %w", err)
|
|
}
|
|
|
|
// Prepare the event
|
|
event := &Event{
|
|
ItemID: incomingEvent.ItemID,
|
|
Collection: incomingEvent.Collection,
|
|
Patches: incomingEvent.Patches,
|
|
EventID: uuid.New().String(),
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Set sequence number
|
|
if latestEvent == nil {
|
|
event.Seq = 1
|
|
} else {
|
|
event.Seq = latestEvent.Seq + 1
|
|
}
|
|
|
|
// Calculate hash
|
|
prevHash := ""
|
|
if latestEvent != nil {
|
|
prevHash = latestEvent.Hash
|
|
}
|
|
event.Hash = event.calculateHash(prevHash)
|
|
|
|
// Save event to event log
|
|
if err := es.saveEvent(event); err != nil {
|
|
return nil, fmt.Errorf("failed to save event: %w", err)
|
|
}
|
|
|
|
// Apply the event to cached data
|
|
if err := es.applyEvent(event); err != nil {
|
|
return nil, fmt.Errorf("failed to apply event: %w", err)
|
|
}
|
|
|
|
return event, nil
|
|
}
|
|
|
|
// saveEvent saves an event to the events table
|
|
func (es *SimpleEventStore) saveEvent(event *Event) error {
|
|
// Convert patches to JSON string
|
|
patchesBytes, err := json.Marshal(event.Patches)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event patches: %w", err)
|
|
}
|
|
|
|
_, err = es.app.DB().
|
|
Insert("events", map[string]any{
|
|
"id": uuid.New().String(),
|
|
"seq": event.Seq,
|
|
"hash": event.Hash,
|
|
"item_id": event.ItemID,
|
|
"event_id": event.EventID,
|
|
"collection": event.Collection,
|
|
"data": string(patchesBytes), // Store patches in data field
|
|
"timestamp": event.Timestamp,
|
|
}).Execute()
|
|
|
|
return err
|
|
}
|
|
|
|
// applyEvent applies an event to the cached data using JSON Patch operations
|
|
func (es *SimpleEventStore) applyEvent(event *Event) error {
|
|
// Get current document state
|
|
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
|
|
if err != nil {
|
|
// If document doesn't exist, create empty one for patches to work on
|
|
currentDoc = map[string]interface{}{
|
|
"id": event.ItemID,
|
|
"created_at": event.Timestamp,
|
|
"updated_at": event.Timestamp,
|
|
}
|
|
}
|
|
|
|
// Apply JSON Patch operations
|
|
patcher := &JSONPatcher{}
|
|
updatedDoc, err := patcher.ApplyPatches(currentDoc, event.Patches)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to apply patches: %w", err)
|
|
}
|
|
|
|
// Always update the updated_at timestamp
|
|
updatedDoc["updated_at"] = event.Timestamp
|
|
|
|
// Save the updated document
|
|
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
|
|
}
|
|
|
|
// getCurrentDocument retrieves the current state of a document
|
|
func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) {
|
|
var result map[string]interface{}
|
|
|
|
rows, err := es.app.DB().
|
|
Select("*").
|
|
From(collection).
|
|
Where(dbx.HashExp{"id": itemID}).
|
|
Limit(1).
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
return nil, fmt.Errorf("document not found")
|
|
}
|
|
|
|
// Get column names
|
|
columns, err := rows.Columns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Scan values
|
|
values := make([]interface{}, len(columns))
|
|
valuePtrs := make([]interface{}, len(columns))
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
if err := rows.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create map
|
|
result = make(map[string]interface{})
|
|
for i, col := range columns {
|
|
result[col] = values[i]
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// saveDocument saves a document to the database
|
|
func (es *SimpleEventStore) saveDocument(collection, itemID string, doc map[string]interface{}) error {
|
|
// Check if document exists
|
|
exists := false
|
|
_, err := es.getCurrentDocument(collection, itemID)
|
|
if err == nil {
|
|
exists = true
|
|
}
|
|
|
|
if exists {
|
|
// Update existing document
|
|
_, err = es.app.DB().
|
|
Update(collection, doc, dbx.HashExp{"id": itemID}).
|
|
Execute()
|
|
} else {
|
|
// Insert new document
|
|
_, err = es.app.DB().
|
|
Insert(collection, doc).
|
|
Execute()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// GetEventsSince returns events since the given sequence number
|
|
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
|
|
rows, err := es.app.DB().
|
|
Select("seq", "hash", "item_id", "event_id", "collection", "data", "timestamp").
|
|
From("events").
|
|
Where(dbx.NewExp("seq > {:seq}", map[string]any{"seq": seq})).
|
|
OrderBy("seq ASC").
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var events []Event
|
|
for rows.Next() {
|
|
var event Event
|
|
var dataStr string
|
|
|
|
err := rows.Scan(&event.Seq, &event.Hash, &event.ItemID, &event.EventID, &event.Collection, &dataStr, &event.Timestamp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan event: %w", err)
|
|
}
|
|
|
|
// Parse patches from data field
|
|
if dataStr != "" {
|
|
var patches []PatchOperation
|
|
if err := json.Unmarshal([]byte(dataStr), &patches); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal patches: %w", err)
|
|
}
|
|
event.Patches = patches
|
|
}
|
|
|
|
events = append(events, event)
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// GetAllItems returns all non-deleted items from a collection
|
|
func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
|
var items []map[string]interface{}
|
|
|
|
rows, err := es.app.DB().
|
|
Select("*").
|
|
From(collection).
|
|
Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")).
|
|
OrderBy("created_at DESC").
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch items: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Get column names
|
|
columns, err := rows.Columns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for rows.Next() {
|
|
// Create slice to hold values
|
|
values := make([]interface{}, len(columns))
|
|
valuePtrs := make([]interface{}, len(columns))
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
// Scan into values
|
|
if err := rows.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create map from columns and values
|
|
item := make(map[string]interface{})
|
|
for i, col := range columns {
|
|
item[col] = values[i]
|
|
}
|
|
|
|
items = append(items, item)
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
// ValidateSync checks if client sync state matches server state
|
|
func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) {
|
|
if clientSeq == 0 {
|
|
return false, nil // Full sync needed
|
|
}
|
|
|
|
var hash string
|
|
err := es.app.DB().
|
|
Select("hash").
|
|
From("events").
|
|
Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})).
|
|
Limit(1).
|
|
One(&hash)
|
|
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return false, nil // Event not found, full sync needed
|
|
}
|
|
return false, fmt.Errorf("failed to validate sync: %w", err)
|
|
}
|
|
|
|
return hash == clientHash, nil
|
|
}
|