332 lines
8.5 KiB
Go
332 lines
8.5 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pocketbase/dbx"
|
|
"github.com/pocketbase/pocketbase"
|
|
"github.com/pocketbase/pocketbase/core"
|
|
)
|
|
|
|
// SimpleEventStore uses direct SQL queries for better compatibility
|
|
type SimpleEventStore struct {
|
|
app *pocketbase.PocketBase
|
|
}
|
|
|
|
func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
|
|
return &SimpleEventStore{app: app}
|
|
}
|
|
|
|
// GetLatestEvent returns the latest event from the event log
|
|
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
|
|
records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{})
|
|
if err != nil || len(records) == 0 {
|
|
return nil, nil // No events found
|
|
}
|
|
|
|
record := records[0]
|
|
event := &Event{
|
|
Seq: record.GetInt("seq"),
|
|
Hash: record.GetString("hash"),
|
|
ItemID: record.GetString("item_id"),
|
|
EventID: record.GetString("event_id"),
|
|
Collection: record.GetString("collection"),
|
|
Operation: record.GetString("operation"),
|
|
Path: record.GetString("path"),
|
|
Value: record.GetString("value"),
|
|
From: record.GetString("from"),
|
|
}
|
|
|
|
// Parse timestamp
|
|
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
|
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
|
event.Timestamp = timestamp
|
|
}
|
|
}
|
|
|
|
return event, nil
|
|
}
|
|
|
|
// ProcessEvent processes an incoming event and applies it to the store
|
|
func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
|
// Get latest event for sequence number and hash chaining
|
|
latestEvent, err := es.GetLatestEvent()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get latest event: %w", err)
|
|
}
|
|
|
|
// Prepare the event
|
|
event := &Event{
|
|
ItemID: incomingEvent.ItemID,
|
|
Collection: incomingEvent.Collection,
|
|
Operation: incomingEvent.Operation,
|
|
Path: incomingEvent.Path,
|
|
Value: incomingEvent.Value,
|
|
From: incomingEvent.From,
|
|
EventID: uuid.New().String(),
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Set sequence number
|
|
if latestEvent == nil {
|
|
event.Seq = 1
|
|
} else {
|
|
event.Seq = latestEvent.Seq + 1
|
|
}
|
|
|
|
// Calculate hash
|
|
prevHash := ""
|
|
if latestEvent != nil {
|
|
prevHash = latestEvent.Hash
|
|
}
|
|
event.Hash = event.calculateHash(prevHash)
|
|
|
|
// Save event to event log
|
|
if err := es.saveEvent(event); err != nil {
|
|
return nil, fmt.Errorf("failed to save event: %w", err)
|
|
}
|
|
|
|
// Apply the event to cached data
|
|
if err := es.applyEvent(event); err != nil {
|
|
return nil, fmt.Errorf("failed to apply event: %w", err)
|
|
}
|
|
|
|
return event, nil
|
|
}
|
|
|
|
// saveEvent saves an event to the events collection
|
|
func (es *SimpleEventStore) saveEvent(event *Event) error {
|
|
collection, err := es.app.FindCollectionByNameOrId("events")
|
|
if err != nil {
|
|
return fmt.Errorf("events collection not found: %w", err)
|
|
}
|
|
|
|
record := core.NewRecord(collection)
|
|
record.Set("seq", event.Seq)
|
|
record.Set("hash", event.Hash)
|
|
record.Set("item_id", event.ItemID)
|
|
record.Set("event_id", event.EventID)
|
|
record.Set("collection", event.Collection)
|
|
record.Set("operation", event.Operation)
|
|
record.Set("path", event.Path)
|
|
record.Set("value", event.Value)
|
|
record.Set("from", event.From)
|
|
record.Set("timestamp", event.Timestamp.Format(time.RFC3339))
|
|
|
|
return es.app.Save(record)
|
|
}
|
|
|
|
// applyEvent applies a single operation to the cached data
|
|
func (es *SimpleEventStore) applyEvent(event *Event) error {
|
|
// Get current document state
|
|
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
|
|
if err != nil {
|
|
// If document doesn't exist, create empty one
|
|
currentDoc = map[string]interface{}{
|
|
"id": event.ItemID,
|
|
}
|
|
}
|
|
|
|
// Apply single operation
|
|
patcher := &JSONPatcher{}
|
|
patches := []PatchOperation{{
|
|
Op: event.Operation,
|
|
Path: event.Path,
|
|
Value: event.Value,
|
|
From: event.From,
|
|
}}
|
|
|
|
updatedDoc, err := patcher.ApplyPatches(currentDoc, patches)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to apply operation: %w", err)
|
|
}
|
|
|
|
// Update timestamp
|
|
updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339)
|
|
|
|
// Set created_at if this is a new document
|
|
if _, exists := currentDoc["created_at"]; !exists {
|
|
updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339)
|
|
}
|
|
|
|
// Save the updated document
|
|
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
|
|
}
|
|
|
|
// getCurrentDocument retrieves the current state of a document
|
|
func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) {
|
|
var result map[string]interface{}
|
|
|
|
rows, err := es.app.DB().
|
|
Select("*").
|
|
From(collection).
|
|
Where(dbx.HashExp{"id": itemID}).
|
|
Limit(1).
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
return nil, fmt.Errorf("document not found")
|
|
}
|
|
|
|
// Get column names
|
|
columns, err := rows.Columns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Scan values
|
|
values := make([]interface{}, len(columns))
|
|
valuePtrs := make([]interface{}, len(columns))
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
if err := rows.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create map
|
|
result = make(map[string]interface{})
|
|
for i, col := range columns {
|
|
result[col] = values[i]
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// saveDocument saves a document to the collection
|
|
func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[string]interface{}) error {
|
|
collection, err := es.app.FindCollectionByNameOrId(collectionName)
|
|
if err != nil {
|
|
return fmt.Errorf("collection %s not found: %w", collectionName, err)
|
|
}
|
|
|
|
// Try to find existing record
|
|
record, err := es.app.FindFirstRecordByFilter(collectionName, "id = {:id}", map[string]any{"id": itemID})
|
|
if err != nil {
|
|
// Record doesn't exist, create new one
|
|
record = core.NewRecord(collection)
|
|
record.Set("id", itemID)
|
|
}
|
|
|
|
// Set all fields from doc
|
|
for key, value := range doc {
|
|
if key != "id" { // Don't overwrite the ID
|
|
record.Set(key, value)
|
|
}
|
|
}
|
|
|
|
return es.app.Save(record)
|
|
}
|
|
|
|
// GetEventsSince returns events since the given sequence number
|
|
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
|
|
records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
|
}
|
|
|
|
events := make([]Event, len(records))
|
|
for i, record := range records {
|
|
events[i] = Event{
|
|
Seq: record.GetInt("seq"),
|
|
Hash: record.GetString("hash"),
|
|
ItemID: record.GetString("item_id"),
|
|
EventID: record.GetString("event_id"),
|
|
Collection: record.GetString("collection"),
|
|
Operation: record.GetString("operation"),
|
|
Path: record.GetString("path"),
|
|
Value: record.GetString("value"),
|
|
From: record.GetString("from"),
|
|
}
|
|
|
|
// Parse timestamp
|
|
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
|
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
|
events[i].Timestamp = timestamp
|
|
}
|
|
}
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// GetAllItems returns all non-deleted items from a collection
|
|
func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
|
var items []map[string]interface{}
|
|
|
|
rows, err := es.app.DB().
|
|
Select("*").
|
|
From(collection).
|
|
Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")).
|
|
OrderBy("created_at DESC").
|
|
Rows()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch items: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Get column names
|
|
columns, err := rows.Columns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for rows.Next() {
|
|
// Create slice to hold values
|
|
values := make([]interface{}, len(columns))
|
|
valuePtrs := make([]interface{}, len(columns))
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
// Scan into values
|
|
if err := rows.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create map from columns and values
|
|
item := make(map[string]interface{})
|
|
for i, col := range columns {
|
|
item[col] = values[i]
|
|
}
|
|
|
|
items = append(items, item)
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
// ValidateSync checks if client sync state matches server state
|
|
func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) {
|
|
if clientSeq == 0 {
|
|
return false, nil // Full sync needed
|
|
}
|
|
|
|
var hash string
|
|
err := es.app.DB().
|
|
Select("hash").
|
|
From("events").
|
|
Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})).
|
|
Limit(1).
|
|
One(&hash)
|
|
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return false, nil // Event not found, full sync needed
|
|
}
|
|
return false, fmt.Errorf("failed to validate sync: %w", err)
|
|
}
|
|
|
|
return hash == clientHash, nil
|
|
}
|