Compare commits
8 Commits
045a9d5e84
...
ff0d0b3bd3
Author | SHA1 | Date | |
---|---|---|---|
ff0d0b3bd3 | |||
6b7a519be9 | |||
607dd465a7 | |||
5f21d144c0 | |||
e9047ef2cb | |||
86c948beee | |||
7639706f5b | |||
c485f66476 |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*.exe
|
||||
pb_data
|
183
api.go
Normal file
183
api.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// setupAPIRoutes sets up the event store API routes
|
||||
func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// JSON Patch endpoint using PATCH method - ONE OPERATION PER REQUEST
|
||||
se.Router.PATCH("/api/collections/{collection}/items/{itemId}", func(e *core.RequestEvent) error {
|
||||
collection := e.Request.PathValue("collection")
|
||||
itemID := e.Request.PathValue("itemId")
|
||||
|
||||
if collection == "" || itemID == "" {
|
||||
return e.BadRequestError("Collection and itemId are required", nil)
|
||||
}
|
||||
|
||||
var operation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value string `json:"value"`
|
||||
From string `json:"from"`
|
||||
}
|
||||
if err := e.BindBody(&operation); err != nil {
|
||||
return e.BadRequestError("Failed to parse operation data", err)
|
||||
}
|
||||
|
||||
// Create event with single operation
|
||||
incomingEvent := &Event{
|
||||
ItemID: itemID,
|
||||
Collection: collection,
|
||||
Operation: operation.Op,
|
||||
Path: operation.Path,
|
||||
Value: operation.Value,
|
||||
From: operation.From,
|
||||
}
|
||||
|
||||
// Process the event
|
||||
processedEvent, err := eventStore.ProcessEvent(incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, processedEvent)
|
||||
})
|
||||
|
||||
// Legacy POST endpoint for compatibility
|
||||
se.Router.POST("/api/events", func(e *core.RequestEvent) error {
|
||||
var incomingEvent Event
|
||||
if err := e.BindBody(&incomingEvent); err != nil {
|
||||
return e.BadRequestError("Failed to parse event data", err)
|
||||
}
|
||||
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
// Process the event
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusCreated, processedEvent)
|
||||
})
|
||||
|
||||
// Sync endpoint for clients
|
||||
se.Router.POST("/api/sync", func(e *core.RequestEvent) error {
|
||||
var syncReq SyncRequest
|
||||
if err := e.BindBody(&syncReq); err != nil {
|
||||
return e.BadRequestError("Failed to parse sync request", err)
|
||||
}
|
||||
|
||||
// Check if client is in sync
|
||||
isValid, err := eventStore.ValidateSync(syncReq.LastSeq, syncReq.LastHash)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to validate sync", err)
|
||||
}
|
||||
|
||||
var response SyncResponse
|
||||
|
||||
if !isValid {
|
||||
// Full sync needed - send all events
|
||||
events, err := eventStore.GetEventsSince(0)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get events", err)
|
||||
}
|
||||
response.Events = events
|
||||
response.FullSync = true
|
||||
} else {
|
||||
// Incremental sync - send events since last sequence
|
||||
events, err := eventStore.GetEventsSince(syncReq.LastSeq)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get events", err)
|
||||
}
|
||||
response.Events = events
|
||||
response.FullSync = false
|
||||
}
|
||||
|
||||
// Get current state
|
||||
latestEvent, err := eventStore.GetLatestEvent()
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get latest event", err)
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response.CurrentSeq = latestEvent.Seq
|
||||
response.CurrentHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, response)
|
||||
})
|
||||
|
||||
// Get all items endpoint
|
||||
se.Router.GET("/api/items/{collection}", func(e *core.RequestEvent) error {
|
||||
collection := e.Request.PathValue("collection")
|
||||
if collection == "" {
|
||||
return e.BadRequestError("Collection name required", nil)
|
||||
}
|
||||
|
||||
items, err := eventStore.GetAllItems(collection)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get items", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, map[string]interface{}{
|
||||
"items": items,
|
||||
})
|
||||
})
|
||||
|
||||
// Batch events endpoint for client to send multiple events
|
||||
se.Router.POST("/api/events/batch", func(e *core.RequestEvent) error {
|
||||
var events []Event
|
||||
if err := e.BindBody(&events); err != nil {
|
||||
return e.BadRequestError("Failed to parse events data", err)
|
||||
}
|
||||
|
||||
processedEvents := make([]Event, 0, len(events))
|
||||
for _, incomingEvent := range events {
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields in event: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
processedEvents = append(processedEvents, *processedEvent)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusCreated, map[string]interface{}{
|
||||
"events": processedEvents,
|
||||
})
|
||||
})
|
||||
|
||||
// Get current state endpoint
|
||||
se.Router.GET("/api/state", func(e *core.RequestEvent) error {
|
||||
latestEvent, err := eventStore.GetLatestEvent()
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get latest event", err)
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"seq": 0,
|
||||
"hash": "",
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response["seq"] = latestEvent.Seq
|
||||
response["hash"] = latestEvent.Hash
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, response)
|
||||
})
|
||||
|
||||
return se.Next()
|
||||
})
|
||||
}
|
627
api_test.go
Normal file
627
api_test.go
Normal file
@@ -0,0 +1,627 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockSimpleEventStore for API testing
|
||||
type MockAPIEventStore struct {
|
||||
events []Event
|
||||
items map[string]map[string]interface{}
|
||||
nextSeq int
|
||||
}
|
||||
|
||||
func NewMockAPIEventStore() *MockAPIEventStore {
|
||||
return &MockAPIEventStore{
|
||||
events: []Event{},
|
||||
items: make(map[string]map[string]interface{}),
|
||||
nextSeq: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetLatestEvent() (*Event, error) {
|
||||
if len(m.events) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return &m.events[len(m.events)-1], nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
event := &Event{
|
||||
Seq: m.nextSeq,
|
||||
Hash: "mock_hash_" + string(rune(m.nextSeq+'0')),
|
||||
ItemID: incomingEvent.ItemID,
|
||||
EventID: "mock_event_" + string(rune(m.nextSeq+'0')),
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
m.events = append(m.events, *event)
|
||||
m.nextSeq++
|
||||
|
||||
// Apply operation to items for testing
|
||||
itemKey := incomingEvent.Collection + ":" + incomingEvent.ItemID
|
||||
if m.items[itemKey] == nil {
|
||||
m.items[itemKey] = map[string]interface{}{"id": incomingEvent.ItemID}
|
||||
}
|
||||
|
||||
// Simple mock application of JSON Patch operations
|
||||
switch incomingEvent.Operation {
|
||||
case "add", "replace":
|
||||
field := incomingEvent.Path[1:] // remove leading "/"
|
||||
m.items[itemKey][field] = incomingEvent.Value
|
||||
case "remove":
|
||||
field := incomingEvent.Path[1:]
|
||||
delete(m.items[itemKey], field)
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
for key, item := range m.items {
|
||||
if len(key) >= len(collection) && key[:len(collection)] == collection {
|
||||
// Skip soft-deleted items
|
||||
if deletedAt, exists := item["deleted_at"]; !exists || deletedAt == "" {
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
if len(m.events) == 0 {
|
||||
return seq == 0 && hash == "", nil
|
||||
}
|
||||
latest := m.events[len(m.events)-1]
|
||||
return latest.Seq == seq && latest.Hash == hash, nil
|
||||
}
|
||||
|
||||
func createTestRouter(eventStore *MockAPIEventStore) *http.ServeMux {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// PATCH endpoint for single operations
|
||||
mux.HandleFunc("PATCH /api/collections/{collection}/items/{itemId}", func(w http.ResponseWriter, r *http.Request) {
|
||||
collection := r.PathValue("collection")
|
||||
itemID := r.PathValue("itemId")
|
||||
|
||||
if collection == "" || itemID == "" {
|
||||
http.Error(w, "Collection and itemId are required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var operation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value string `json:"value"`
|
||||
From string `json:"from"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&operation); err != nil {
|
||||
http.Error(w, "Failed to parse operation data", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
incomingEvent := &Event{
|
||||
ItemID: itemID,
|
||||
Collection: collection,
|
||||
Operation: operation.Op,
|
||||
Path: operation.Path,
|
||||
Value: operation.Value,
|
||||
From: operation.From,
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(incomingEvent)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to process event", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(processedEvent)
|
||||
})
|
||||
|
||||
// POST endpoint for legacy events
|
||||
mux.HandleFunc("POST /api/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
var incomingEvent Event
|
||||
if err := json.NewDecoder(r.Body).Decode(&incomingEvent); err != nil {
|
||||
http.Error(w, "Failed to parse event data", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
http.Error(w, "Missing required fields: item_id, collection, operation", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to process event", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
json.NewEncoder(w).Encode(processedEvent)
|
||||
})
|
||||
|
||||
// Sync endpoint
|
||||
mux.HandleFunc("POST /api/sync", func(w http.ResponseWriter, r *http.Request) {
|
||||
var syncReq SyncRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&syncReq); err != nil {
|
||||
http.Error(w, "Failed to parse sync request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
isValid, err := eventStore.ValidateSync(syncReq.LastSeq, syncReq.LastHash)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to validate sync", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
events, err := eventStore.GetEventsSince(syncReq.LastSeq)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to get events", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
latestEvent, _ := eventStore.GetLatestEvent()
|
||||
|
||||
response := SyncResponse{
|
||||
Events: events,
|
||||
FullSync: !isValid,
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response.CurrentSeq = latestEvent.Seq
|
||||
response.CurrentHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// Get items endpoint
|
||||
mux.HandleFunc("GET /api/items/{collection}", func(w http.ResponseWriter, r *http.Request) {
|
||||
collection := r.PathValue("collection")
|
||||
|
||||
items, err := eventStore.GetAllItems(collection)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to get items", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]interface{}{"items": items}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// Health endpoint
|
||||
mux.HandleFunc("GET /api/health", func(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"message": "API is healthy.",
|
||||
"code": 200,
|
||||
"data": map[string]interface{}{},
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// State endpoint
|
||||
mux.HandleFunc("GET /api/state", func(w http.ResponseWriter, r *http.Request) {
|
||||
latestEvent, _ := eventStore.GetLatestEvent()
|
||||
|
||||
response := map[string]interface{}{
|
||||
"seq": 0,
|
||||
"hash": "",
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response["seq"] = latestEvent.Seq
|
||||
response["hash"] = latestEvent.Hash
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_CreateItem(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
operation := map[string]string{
|
||||
"op": "add",
|
||||
"path": "/content",
|
||||
"value": "test item content",
|
||||
}
|
||||
body, _ := json.Marshal(operation)
|
||||
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "shopping_items", response.Collection)
|
||||
assert.Equal(t, "add", response.Operation)
|
||||
assert.Equal(t, "/content", response.Path)
|
||||
assert.Equal(t, "test item content", response.Value)
|
||||
assert.Equal(t, 1, response.Seq)
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_UpdateItem(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Create item first
|
||||
createOp := map[string]string{
|
||||
"op": "add",
|
||||
"path": "/content",
|
||||
"value": "original content",
|
||||
}
|
||||
body, _ := json.Marshal(createOp)
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
// Update item
|
||||
updateOp := map[string]string{
|
||||
"op": "replace",
|
||||
"path": "/content",
|
||||
"value": "updated content",
|
||||
}
|
||||
body, _ = json.Marshal(updateOp)
|
||||
req = httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w = httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "replace", response.Operation)
|
||||
assert.Equal(t, "/content", response.Path)
|
||||
assert.Equal(t, "updated content", response.Value)
|
||||
assert.Equal(t, 2, response.Seq)
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_InvalidData(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader([]byte("invalid json")))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
}
|
||||
|
||||
func TestAPI_PostEndpoint_LegacyEvent(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
event := Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
}
|
||||
body, _ := json.Marshal(event)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/events", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusCreated, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "add", response.Operation)
|
||||
}
|
||||
|
||||
func TestAPI_PostEndpoint_MissingFields(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
}{
|
||||
{
|
||||
name: "missing item_id",
|
||||
event: Event{
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing collection",
|
||||
event: Event{
|
||||
ItemID: "test123",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing operation",
|
||||
event: Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
body, _ := json.Marshal(tt.event)
|
||||
req := httptest.NewRequest("POST", "/api/events", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPI_SyncEndpoint_FullSync(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some events first
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
// Request sync from beginning
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: 0,
|
||||
LastHash: "",
|
||||
}
|
||||
body, _ := json.Marshal(syncReq)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/sync", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response SyncResponse
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, response.Events, 2)
|
||||
assert.Equal(t, 2, response.CurrentSeq)
|
||||
}
|
||||
|
||||
func TestAPI_SyncEndpoint_IncrementalSync(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some events first
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
// Request sync from seq 1
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: 1,
|
||||
LastHash: "mock_hash_1",
|
||||
}
|
||||
body, _ := json.Marshal(syncReq)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/sync", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response SyncResponse
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, response.Events, 1) // Only event 2
|
||||
assert.Equal(t, 2, response.CurrentSeq)
|
||||
}
|
||||
|
||||
func TestAPI_GetItemsEndpoint(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some items
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/items/shopping_items", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response map[string]interface{}
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
|
||||
items, ok := response["items"].([]interface{})
|
||||
require.True(t, ok)
|
||||
assert.Len(t, items, 2)
|
||||
}
|
||||
|
||||
func TestAPI_GetStateEndpoint(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Test empty state
|
||||
req := httptest.NewRequest("GET", "/api/state", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response map[string]interface{}
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, float64(0), response["seq"])
|
||||
assert.Equal(t, "", response["hash"])
|
||||
|
||||
// Add an event
|
||||
event := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event)
|
||||
|
||||
// Test state with event
|
||||
req = httptest.NewRequest("GET", "/api/state", nil)
|
||||
w = httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
err = json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, float64(1), response["seq"])
|
||||
assert.NotEmpty(t, response["hash"])
|
||||
}
|
||||
|
||||
func TestAPI_PathValues(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
url string
|
||||
method string
|
||||
expectedStatus int
|
||||
}{
|
||||
{
|
||||
name: "valid patch path",
|
||||
url: "/api/collections/shopping_items/items/test123",
|
||||
method: "PATCH",
|
||||
expectedStatus: http.StatusOK, // Valid operation should succeed
|
||||
},
|
||||
{
|
||||
name: "valid get items path",
|
||||
url: "/api/items/shopping_items",
|
||||
method: "GET",
|
||||
expectedStatus: http.StatusOK,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var req *http.Request
|
||||
if tt.method == "PATCH" {
|
||||
req = httptest.NewRequest(tt.method, tt.url, bytes.NewReader([]byte("{}")))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
} else {
|
||||
req = httptest.NewRequest(tt.method, tt.url, nil)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, tt.expectedStatus, w.Code)
|
||||
})
|
||||
}
|
||||
}
|
331
event_store_simple.go
Normal file
331
event_store_simple.go
Normal file
@@ -0,0 +1,331 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase"
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// SimpleEventStore uses direct SQL queries for better compatibility
|
||||
type SimpleEventStore struct {
|
||||
app *pocketbase.PocketBase
|
||||
}
|
||||
|
||||
func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
|
||||
return &SimpleEventStore{app: app}
|
||||
}
|
||||
|
||||
// GetLatestEvent returns the latest event from the event log
|
||||
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
|
||||
records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{})
|
||||
if err != nil || len(records) == 0 {
|
||||
return nil, nil // No events found
|
||||
}
|
||||
|
||||
record := records[0]
|
||||
event := &Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
event.Timestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// ProcessEvent processes an incoming event and applies it to the store
|
||||
func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
// Get latest event for sequence number and hash chaining
|
||||
latestEvent, err := es.GetLatestEvent()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get latest event: %w", err)
|
||||
}
|
||||
|
||||
// Prepare the event
|
||||
event := &Event{
|
||||
ItemID: incomingEvent.ItemID,
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
EventID: uuid.New().String(),
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Set sequence number
|
||||
if latestEvent == nil {
|
||||
event.Seq = 1
|
||||
} else {
|
||||
event.Seq = latestEvent.Seq + 1
|
||||
}
|
||||
|
||||
// Calculate hash
|
||||
prevHash := ""
|
||||
if latestEvent != nil {
|
||||
prevHash = latestEvent.Hash
|
||||
}
|
||||
event.Hash = event.calculateHash(prevHash)
|
||||
|
||||
// Save event to event log
|
||||
if err := es.saveEvent(event); err != nil {
|
||||
return nil, fmt.Errorf("failed to save event: %w", err)
|
||||
}
|
||||
|
||||
// Apply the event to cached data
|
||||
if err := es.applyEvent(event); err != nil {
|
||||
return nil, fmt.Errorf("failed to apply event: %w", err)
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// saveEvent saves an event to the events collection
|
||||
func (es *SimpleEventStore) saveEvent(event *Event) error {
|
||||
collection, err := es.app.FindCollectionByNameOrId("events")
|
||||
if err != nil {
|
||||
return fmt.Errorf("events collection not found: %w", err)
|
||||
}
|
||||
|
||||
record := core.NewRecord(collection)
|
||||
record.Set("seq", event.Seq)
|
||||
record.Set("hash", event.Hash)
|
||||
record.Set("item_id", event.ItemID)
|
||||
record.Set("event_id", event.EventID)
|
||||
record.Set("collection", event.Collection)
|
||||
record.Set("operation", event.Operation)
|
||||
record.Set("path", event.Path)
|
||||
record.Set("value", event.Value)
|
||||
record.Set("from", event.From)
|
||||
record.Set("timestamp", event.Timestamp.Format(time.RFC3339))
|
||||
|
||||
return es.app.Save(record)
|
||||
}
|
||||
|
||||
// applyEvent applies a single operation to the cached data
|
||||
func (es *SimpleEventStore) applyEvent(event *Event) error {
|
||||
// Get current document state
|
||||
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
|
||||
if err != nil {
|
||||
// If document doesn't exist, create empty one
|
||||
currentDoc = map[string]interface{}{
|
||||
"id": event.ItemID,
|
||||
}
|
||||
}
|
||||
|
||||
// Apply single operation
|
||||
patcher := &JSONPatcher{}
|
||||
patches := []PatchOperation{{
|
||||
Op: event.Operation,
|
||||
Path: event.Path,
|
||||
Value: event.Value,
|
||||
From: event.From,
|
||||
}}
|
||||
|
||||
updatedDoc, err := patcher.ApplyPatches(currentDoc, patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply operation: %w", err)
|
||||
}
|
||||
|
||||
// Update timestamp
|
||||
updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
|
||||
// Set created_at if this is a new document
|
||||
if _, exists := currentDoc["created_at"]; !exists {
|
||||
updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// Save the updated document
|
||||
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
|
||||
}
|
||||
|
||||
// getCurrentDocument retrieves the current state of a document
|
||||
func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) {
|
||||
var result map[string]interface{}
|
||||
|
||||
rows, err := es.app.DB().
|
||||
Select("*").
|
||||
From(collection).
|
||||
Where(dbx.HashExp{"id": itemID}).
|
||||
Limit(1).
|
||||
Rows()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return nil, fmt.Errorf("document not found")
|
||||
}
|
||||
|
||||
// Get column names
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Scan values
|
||||
values := make([]interface{}, len(columns))
|
||||
valuePtrs := make([]interface{}, len(columns))
|
||||
for i := range values {
|
||||
valuePtrs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(valuePtrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create map
|
||||
result = make(map[string]interface{})
|
||||
for i, col := range columns {
|
||||
result[col] = values[i]
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// saveDocument saves a document to the collection
|
||||
func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[string]interface{}) error {
|
||||
collection, err := es.app.FindCollectionByNameOrId(collectionName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("collection %s not found: %w", collectionName, err)
|
||||
}
|
||||
|
||||
// Try to find existing record
|
||||
record, err := es.app.FindFirstRecordByFilter(collectionName, "id = {:id}", map[string]any{"id": itemID})
|
||||
if err != nil {
|
||||
// Record doesn't exist, create new one
|
||||
record = core.NewRecord(collection)
|
||||
record.Set("id", itemID)
|
||||
}
|
||||
|
||||
// Set all fields from doc
|
||||
for key, value := range doc {
|
||||
if key != "id" { // Don't overwrite the ID
|
||||
record.Set(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
return es.app.Save(record)
|
||||
}
|
||||
|
||||
// GetEventsSince returns events since the given sequence number
|
||||
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
||||
}
|
||||
|
||||
events := make([]Event, len(records))
|
||||
for i, record := range records {
|
||||
events[i] = Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
events[i].Timestamp = timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// GetAllItems returns all non-deleted items from a collection
|
||||
func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
|
||||
rows, err := es.app.DB().
|
||||
Select("*").
|
||||
From(collection).
|
||||
Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")).
|
||||
OrderBy("created_at DESC").
|
||||
Rows()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch items: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Get column names
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
// Create slice to hold values
|
||||
values := make([]interface{}, len(columns))
|
||||
valuePtrs := make([]interface{}, len(columns))
|
||||
for i := range values {
|
||||
valuePtrs[i] = &values[i]
|
||||
}
|
||||
|
||||
// Scan into values
|
||||
if err := rows.Scan(valuePtrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create map from columns and values
|
||||
item := make(map[string]interface{})
|
||||
for i, col := range columns {
|
||||
item[col] = values[i]
|
||||
}
|
||||
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ValidateSync checks if client sync state matches server state
|
||||
func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) {
|
||||
if clientSeq == 0 {
|
||||
return false, nil // Full sync needed
|
||||
}
|
||||
|
||||
var hash string
|
||||
err := es.app.DB().
|
||||
Select("hash").
|
||||
From("events").
|
||||
Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})).
|
||||
Limit(1).
|
||||
One(&hash)
|
||||
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil // Event not found, full sync needed
|
||||
}
|
||||
return false, fmt.Errorf("failed to validate sync: %w", err)
|
||||
}
|
||||
|
||||
return hash == clientHash, nil
|
||||
}
|
407
event_store_test.go
Normal file
407
event_store_test.go
Normal file
@@ -0,0 +1,407 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEvent_Serialize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
}{
|
||||
{
|
||||
name: "simple event",
|
||||
event: Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
From: "",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "event with multiple fields",
|
||||
event: Event{
|
||||
Seq: 2,
|
||||
ItemID: "test456",
|
||||
EventID: "event-456",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
From: "/old_priority",
|
||||
Timestamp: time.Date(2025, 1, 2, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
serialized := tt.event.serialize()
|
||||
|
||||
// Check that all fields are present in serialized string
|
||||
assert.Contains(t, serialized, "seq:")
|
||||
assert.Contains(t, serialized, "item_id:")
|
||||
assert.Contains(t, serialized, "event_id:")
|
||||
assert.Contains(t, serialized, "collection:")
|
||||
assert.Contains(t, serialized, "operation:")
|
||||
assert.Contains(t, serialized, "path:")
|
||||
assert.Contains(t, serialized, "value:")
|
||||
assert.Contains(t, serialized, "from:")
|
||||
assert.Contains(t, serialized, "timestamp:")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvent_CalculateHash(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
prevHash string
|
||||
}{
|
||||
{
|
||||
name: "first event (no previous hash)",
|
||||
event: Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
prevHash: "",
|
||||
},
|
||||
{
|
||||
name: "second event (with previous hash)",
|
||||
event: Event{
|
||||
Seq: 2,
|
||||
ItemID: "test123",
|
||||
EventID: "event-124",
|
||||
Collection: "items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "updated value",
|
||||
Timestamp: time.Date(2025, 1, 1, 1, 0, 0, 0, time.UTC),
|
||||
},
|
||||
prevHash: "previous_hash_123",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
hash := tt.event.calculateHash(tt.prevHash)
|
||||
|
||||
// Hash should not be empty
|
||||
assert.NotEmpty(t, hash)
|
||||
|
||||
// Hash should be deterministic
|
||||
hash2 := tt.event.calculateHash(tt.prevHash)
|
||||
assert.Equal(t, hash, hash2)
|
||||
|
||||
// Different previous hash should produce different result
|
||||
if tt.prevHash == "" {
|
||||
differentHash := tt.event.calculateHash("different")
|
||||
assert.NotEqual(t, hash, differentHash)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvent_CalculateHash_Consistency(t *testing.T) {
|
||||
event := Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
}
|
||||
|
||||
// Same event should always produce same hash
|
||||
hash1 := event.calculateHash("prev")
|
||||
hash2 := event.calculateHash("prev")
|
||||
assert.Equal(t, hash1, hash2)
|
||||
|
||||
// Different previous hash should produce different hash
|
||||
hash3 := event.calculateHash("different_prev")
|
||||
assert.NotEqual(t, hash1, hash3)
|
||||
}
|
||||
|
||||
// Mock Event Store for testing
|
||||
type MockEventStore struct {
|
||||
events []Event
|
||||
latestEvent *Event
|
||||
processError error
|
||||
syncValid bool
|
||||
}
|
||||
|
||||
func (m *MockEventStore) ProcessEvent(event *Event) (*Event, error) {
|
||||
if m.processError != nil {
|
||||
return nil, m.processError
|
||||
}
|
||||
|
||||
processedEvent := &Event{
|
||||
Seq: len(m.events) + 1,
|
||||
Hash: "mock_hash",
|
||||
ItemID: event.ItemID,
|
||||
EventID: "mock_event_id",
|
||||
Collection: event.Collection,
|
||||
Operation: event.Operation,
|
||||
Path: event.Path,
|
||||
Value: event.Value,
|
||||
From: event.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
m.events = append(m.events, *processedEvent)
|
||||
m.latestEvent = processedEvent
|
||||
return processedEvent, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetLatestEvent() (*Event, error) {
|
||||
return m.latestEvent, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
return m.syncValid, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
// Mock implementation
|
||||
items := []map[string]interface{}{
|
||||
{"id": "item1", "content": "test item 1"},
|
||||
{"id": "item2", "content": "test item 2"},
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func TestEventProcessing_CreateItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
event := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "My test item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "My test item", result.Value)
|
||||
assert.Equal(t, 1, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_UpdateItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
// Create item first
|
||||
createEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Original content",
|
||||
}
|
||||
_, err := mockStore.ProcessEvent(createEvent)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Update item
|
||||
updateEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated content",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(updateEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "replace", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "Updated content", result.Value)
|
||||
assert.Equal(t, 2, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_DeleteItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
// Create item first
|
||||
createEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Test content",
|
||||
}
|
||||
_, err := mockStore.ProcessEvent(createEvent)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Soft delete item
|
||||
deleteEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/deleted_at",
|
||||
Value: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(deleteEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/deleted_at", result.Path)
|
||||
assert.NotEmpty(t, result.Value)
|
||||
assert.Equal(t, 2, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_ComplexOperations(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
operation string
|
||||
path string
|
||||
value string
|
||||
from string
|
||||
}{
|
||||
{"add priority", "add", "/priority", "high", ""},
|
||||
{"replace content", "replace", "/content", "new content", ""},
|
||||
{"move field", "move", "/title", "", "/content"},
|
||||
{"copy field", "copy", "/backup_content", "", "/content"},
|
||||
{"remove field", "remove", "/priority", "", ""},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: tt.operation,
|
||||
Path: tt.path,
|
||||
Value: tt.value,
|
||||
From: tt.from,
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, tt.operation, result.Operation)
|
||||
assert.Equal(t, tt.path, result.Path)
|
||||
assert.Equal(t, i+1, result.Seq)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventSerialization(t *testing.T) {
|
||||
event := Event{
|
||||
Seq: 42,
|
||||
ItemID: "item-456",
|
||||
EventID: "event-789",
|
||||
Collection: "test_collection",
|
||||
Operation: "replace",
|
||||
Path: "/status",
|
||||
Value: "completed",
|
||||
From: "/old_status",
|
||||
Timestamp: time.Date(2025, 3, 15, 14, 30, 0, 0, time.UTC),
|
||||
}
|
||||
|
||||
serialized := event.serialize()
|
||||
|
||||
// Verify all components are present
|
||||
assert.Contains(t, serialized, "seq:42")
|
||||
assert.Contains(t, serialized, "item_id:item-456")
|
||||
assert.Contains(t, serialized, "event_id:event-789")
|
||||
assert.Contains(t, serialized, "collection:test_collection")
|
||||
assert.Contains(t, serialized, "operation:replace")
|
||||
assert.Contains(t, serialized, "path:/status")
|
||||
assert.Contains(t, serialized, "value:completed")
|
||||
assert.Contains(t, serialized, "from:/old_status")
|
||||
assert.Contains(t, serialized, "timestamp:2025-03-15T14:30:00Z")
|
||||
}
|
||||
|
||||
func TestPatchOperationSerialization(t *testing.T) {
|
||||
op := PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/test",
|
||||
Value: "test value",
|
||||
From: "/source",
|
||||
}
|
||||
|
||||
// Test that PatchOperation can be used in JSON operations
|
||||
assert.Equal(t, "add", op.Op)
|
||||
assert.Equal(t, "/test", op.Path)
|
||||
assert.Equal(t, "test value", op.Value)
|
||||
assert.Equal(t, "/source", op.From)
|
||||
}
|
||||
|
||||
func TestSyncRequest_Response(t *testing.T) {
|
||||
// Test SyncRequest
|
||||
req := SyncRequest{
|
||||
LastSeq: 10,
|
||||
LastHash: "abc123hash",
|
||||
}
|
||||
|
||||
assert.Equal(t, 10, req.LastSeq)
|
||||
assert.Equal(t, "abc123hash", req.LastHash)
|
||||
|
||||
// Test SyncResponse
|
||||
events := []Event{
|
||||
{
|
||||
Seq: 11,
|
||||
ItemID: "item1",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "new item",
|
||||
},
|
||||
{
|
||||
Seq: 12,
|
||||
ItemID: "item2",
|
||||
Operation: "replace",
|
||||
Path: "/status",
|
||||
Value: "updated",
|
||||
},
|
||||
}
|
||||
|
||||
resp := SyncResponse{
|
||||
Events: events,
|
||||
CurrentSeq: 12,
|
||||
CurrentHash: "def456hash",
|
||||
FullSync: false,
|
||||
}
|
||||
|
||||
assert.Len(t, resp.Events, 2)
|
||||
assert.Equal(t, 12, resp.CurrentSeq)
|
||||
assert.Equal(t, "def456hash", resp.CurrentHash)
|
||||
assert.False(t, resp.FullSync)
|
||||
|
||||
// Verify event details
|
||||
assert.Equal(t, "add", resp.Events[0].Operation)
|
||||
assert.Equal(t, "replace", resp.Events[1].Operation)
|
||||
}
|
269
example_client.go
Normal file
269
example_client.go
Normal file
@@ -0,0 +1,269 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ExampleClient demonstrates how to interact with the event store API
|
||||
type ExampleClient struct {
|
||||
baseURL string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewExampleClient(baseURL string) *ExampleClient {
|
||||
return &ExampleClient{
|
||||
baseURL: baseURL,
|
||||
client: &http.Client{Timeout: 10 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ExampleClient) CreateItem(itemID, content string) error {
|
||||
// Create item using JSON Patch "add" operations
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: content},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
func (c *ExampleClient) UpdateItem(itemID, content string) error {
|
||||
// Update item using JSON Patch "replace" operation
|
||||
patches := []PatchOperation{
|
||||
{Op: "replace", Path: "/content", Value: content},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
func (c *ExampleClient) DeleteItem(itemID string) error {
|
||||
// Delete item using JSON Patch "add" operation for deleted_at
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/deleted_at", Value: time.Now()},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
// sendPatch sends JSON Patch operations using the PATCH method
|
||||
func (c *ExampleClient) sendPatch(collection, itemID string, patches []PatchOperation) error {
|
||||
jsonData, err := json.Marshal(patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal patches: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/api/collections/%s/items/%s", c.baseURL, collection, itemID)
|
||||
req, err := http.NewRequest("PATCH", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send patch: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("patch failed: %s", string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendEvent sends event using the legacy POST method
|
||||
func (c *ExampleClient) sendEvent(event Event) error {
|
||||
jsonData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.client.Post(c.baseURL+"/api/events", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send event: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("event creation failed: %s", string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ExampleClient) SyncEvents(lastSeq int, lastHash string) (*SyncResponse, error) {
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: lastSeq,
|
||||
LastHash: lastHash,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(syncReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal sync request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.client.Post(c.baseURL+"/api/sync", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to sync: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("sync failed: %s", string(body))
|
||||
}
|
||||
|
||||
var syncResp SyncResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&syncResp); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode sync response: %w", err)
|
||||
}
|
||||
|
||||
return &syncResp, nil
|
||||
}
|
||||
|
||||
func (c *ExampleClient) GetItems(collection string) ([]map[string]interface{}, error) {
|
||||
resp, err := c.client.Get(c.baseURL + "/api/items/" + collection)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get items: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get items failed: %s", string(body))
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
items, ok := result["items"].([]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid response format")
|
||||
}
|
||||
|
||||
var typedItems []map[string]interface{}
|
||||
for _, item := range items {
|
||||
if typedItem, ok := item.(map[string]interface{}); ok {
|
||||
typedItems = append(typedItems, typedItem)
|
||||
}
|
||||
}
|
||||
|
||||
return typedItems, nil
|
||||
}
|
||||
|
||||
// Advanced JSON Patch operations examples
|
||||
func (c *ExampleClient) AdvancedPatchExample(itemID string) error {
|
||||
// Example: Create item with multiple fields
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: "Advanced Shopping Item"},
|
||||
{Op: "add", Path: "/quantity", Value: 5},
|
||||
{Op: "add", Path: "/tags", Value: []string{"grocery", "urgent"}},
|
||||
{Op: "add", Path: "/metadata", Value: map[string]interface{}{
|
||||
"store": "SuperMart",
|
||||
"category": "food",
|
||||
}},
|
||||
}
|
||||
|
||||
if err := c.sendPatch("shopping_items", itemID, patches); err != nil {
|
||||
return fmt.Errorf("failed to create advanced item: %w", err)
|
||||
}
|
||||
|
||||
// Example: Complex updates using different operations
|
||||
updatePatches := []PatchOperation{
|
||||
{Op: "replace", Path: "/quantity", Value: 3}, // Update quantity
|
||||
{Op: "add", Path: "/tags/-", Value: "sale"}, // Append to array
|
||||
{Op: "replace", Path: "/metadata/store", Value: "MegaMart"}, // Update nested field
|
||||
{Op: "add", Path: "/metadata/priority", Value: "high"}, // Add new nested field
|
||||
{Op: "remove", Path: "/tags/0"}, // Remove first tag
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, updatePatches)
|
||||
}
|
||||
|
||||
// Example usage function
|
||||
func runClientExample() {
|
||||
client := NewExampleClient("http://localhost:8090")
|
||||
|
||||
fmt.Println("=== RFC6902 JSON Patch Event Store Example ===")
|
||||
|
||||
// Create some shopping items using JSON Patch
|
||||
fmt.Println("Creating shopping items with JSON Patch...")
|
||||
if err := client.CreateItem("item1", "Milk"); err != nil {
|
||||
fmt.Printf("Error creating item1: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := client.CreateItem("item2", "Bread"); err != nil {
|
||||
fmt.Printf("Error creating item2: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Update an item using JSON Patch replace
|
||||
fmt.Println("Updating item1 with JSON Patch replace...")
|
||||
if err := client.UpdateItem("item1", "Organic Milk"); err != nil {
|
||||
fmt.Printf("Error updating item1: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Advanced JSON Patch operations
|
||||
fmt.Println("Demonstrating advanced JSON Patch operations...")
|
||||
if err := client.AdvancedPatchExample("item3"); err != nil {
|
||||
fmt.Printf("Error with advanced patch example: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get current items
|
||||
fmt.Println("Fetching current items...")
|
||||
items, err := client.GetItems("shopping_items")
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting items: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Current items: %+v\n", items)
|
||||
|
||||
// Sync events (simulate client sync)
|
||||
fmt.Println("Syncing events...")
|
||||
syncResp, err := client.SyncEvents(0, "")
|
||||
if err != nil {
|
||||
fmt.Printf("Error syncing: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Sync response - Found %d events\n", len(syncResp.Events))
|
||||
|
||||
// Soft delete an item using JSON Patch
|
||||
fmt.Println("Soft deleting item2 with JSON Patch...")
|
||||
if err := client.DeleteItem("item2"); err != nil {
|
||||
fmt.Printf("Error deleting item2: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get items again to see the change
|
||||
fmt.Println("Fetching items after deletion...")
|
||||
items, err = client.GetItems("shopping_items")
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting items: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Items after deletion: %+v\n", items)
|
||||
fmt.Println("=== JSON Patch Example completed ===")
|
||||
}
|
||||
|
||||
// Uncomment the following to run the example:
|
||||
// func init() {
|
||||
// go func() {
|
||||
// time.Sleep(2 * time.Second) // Wait for server to start
|
||||
// runClientExample()
|
||||
// }()
|
||||
// }
|
2
go.mod
2
go.mod
@@ -7,6 +7,7 @@ toolchain go1.24.7
|
||||
require (
|
||||
git.site.quack-lab.dev/dave/cylogger v1.4.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/pocketbase/dbx v1.11.0
|
||||
github.com/pocketbase/pocketbase v0.30.0
|
||||
github.com/stretchr/testify v1.4.0
|
||||
)
|
||||
@@ -29,7 +30,6 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/pocketbase/dbx v1.11.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/spf13/cast v1.9.2 // indirect
|
||||
github.com/spf13/cobra v1.10.1 // indirect
|
||||
|
606
integration_test.go
Normal file
606
integration_test.go
Normal file
@@ -0,0 +1,606 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockIntegrationEventStore for integration testing
|
||||
type MockIntegrationEventStore struct {
|
||||
events []Event
|
||||
items map[string]map[string]interface{}
|
||||
nextSeq int
|
||||
patcher *JSONPatcher
|
||||
syncValid bool
|
||||
forceError bool
|
||||
}
|
||||
|
||||
func NewMockIntegrationEventStore() *MockIntegrationEventStore {
|
||||
return &MockIntegrationEventStore{
|
||||
events: []Event{},
|
||||
items: make(map[string]map[string]interface{}),
|
||||
nextSeq: 1,
|
||||
patcher: &JSONPatcher{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetLatestEvent() (*Event, error) {
|
||||
if len(m.events) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return &m.events[len(m.events)-1], nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
if m.forceError {
|
||||
return nil, assert.AnError
|
||||
}
|
||||
|
||||
// Create processed event
|
||||
event := &Event{
|
||||
Seq: m.nextSeq,
|
||||
Hash: "hash_" + string(rune(m.nextSeq+'0')),
|
||||
ItemID: incomingEvent.ItemID,
|
||||
EventID: "event_" + string(rune(m.nextSeq+'0')),
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Apply event to items using JSON Patch
|
||||
itemKey := incomingEvent.Collection + ":" + incomingEvent.ItemID
|
||||
if m.items[itemKey] == nil {
|
||||
m.items[itemKey] = map[string]interface{}{
|
||||
"id": incomingEvent.ItemID,
|
||||
}
|
||||
}
|
||||
|
||||
// Create patch operation and apply it
|
||||
patch := PatchOperation{
|
||||
Op: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
}
|
||||
|
||||
updatedItem, err := m.patcher.ApplyPatches(m.items[itemKey], []PatchOperation{patch})
|
||||
if err != nil {
|
||||
return nil, err // Return the actual JSON patch error
|
||||
}
|
||||
m.items[itemKey] = updatedItem
|
||||
|
||||
m.events = append(m.events, *event)
|
||||
m.nextSeq++
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
for key, item := range m.items {
|
||||
if len(key) >= len(collection) && key[:len(collection)] == collection {
|
||||
// Skip soft-deleted items
|
||||
if deletedAt, exists := item["deleted_at"]; !exists || deletedAt == "" {
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
if m.forceError {
|
||||
return false, assert.AnError
|
||||
}
|
||||
return m.syncValid, nil
|
||||
}
|
||||
|
||||
func TestFullWorkflow(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
t.Run("1. Create initial item using JSON Patch", func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Initial workflow item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, result.Seq)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "Initial workflow item", result.Value)
|
||||
})
|
||||
|
||||
t.Run("2. Update item with multiple operations", func(t *testing.T) {
|
||||
// Add priority
|
||||
event1 := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
}
|
||||
result1, err := mockStore.ProcessEvent(event1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, result1.Seq)
|
||||
|
||||
// Update content
|
||||
event2 := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated workflow item",
|
||||
}
|
||||
result2, err := mockStore.ProcessEvent(event2)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, result2.Seq)
|
||||
})
|
||||
|
||||
t.Run("3. Create second item", func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Second workflow item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 4, result.Seq)
|
||||
})
|
||||
|
||||
t.Run("4. Get all items", func(t *testing.T) {
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 2)
|
||||
|
||||
// Find first item and verify its state
|
||||
var item1 map[string]interface{}
|
||||
for _, item := range items {
|
||||
if item["id"] == "workflow_item_001" {
|
||||
item1 = item
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, item1)
|
||||
assert.Equal(t, "Updated workflow item", item1["content"])
|
||||
assert.Equal(t, "high", item1["priority"])
|
||||
})
|
||||
|
||||
t.Run("5. Test client synchronization", func(t *testing.T) {
|
||||
// Test getting events since sequence 2
|
||||
events, err := mockStore.GetEventsSince(2)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 2) // events 3 and 4
|
||||
|
||||
// Verify latest event
|
||||
latestEvent, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 4, latestEvent.Seq)
|
||||
|
||||
// Test sync validation
|
||||
mockStore.syncValid = true
|
||||
isValid, err := mockStore.ValidateSync(4, "hash_4")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, isValid)
|
||||
})
|
||||
|
||||
t.Run("6. Soft delete an item", func(t *testing.T) {
|
||||
deleteEvent := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/deleted_at",
|
||||
Value: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(deleteEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, result.Seq)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/deleted_at", result.Path)
|
||||
|
||||
// Verify item is now filtered out
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1) // Only one item remaining
|
||||
|
||||
// Verify remaining item is item_002
|
||||
assert.Equal(t, "workflow_item_002", items[0]["id"])
|
||||
})
|
||||
|
||||
t.Run("7. Complex nested operations", func(t *testing.T) {
|
||||
// Add metadata object
|
||||
metadataEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/metadata",
|
||||
Value: `{"created_by": "system", "version": 1}`,
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(metadataEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 6, result.Seq)
|
||||
|
||||
// Skip nested field update since we store JSON as strings
|
||||
// The metadata is stored as a JSON string, not a parsed object
|
||||
})
|
||||
|
||||
t.Run("8. Test operation on nested data", func(t *testing.T) {
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1)
|
||||
|
||||
item := items[0]
|
||||
assert.Equal(t, "workflow_item_002", item["id"])
|
||||
assert.Equal(t, "Second workflow item", item["content"])
|
||||
|
||||
// Check nested metadata (stored as JSON string)
|
||||
metadataStr, ok := item["metadata"].(string)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, metadataStr, "system")
|
||||
assert.Contains(t, metadataStr, "1") // Original value
|
||||
})
|
||||
|
||||
t.Run("9. Test move operation", func(t *testing.T) {
|
||||
moveEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "move",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(moveEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 7, result.Seq)
|
||||
|
||||
// Verify move operation
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
item := items[0]
|
||||
|
||||
assert.Equal(t, "Second workflow item", item["title"])
|
||||
assert.Nil(t, item["content"]) // Should be moved, not copied
|
||||
})
|
||||
|
||||
t.Run("10. Test copy operation", func(t *testing.T) {
|
||||
copyEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "copy",
|
||||
From: "/title",
|
||||
Path: "/backup_title",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(copyEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 8, result.Seq)
|
||||
|
||||
// Verify copy operation
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
item := items[0]
|
||||
|
||||
assert.Equal(t, "Second workflow item", item["title"])
|
||||
assert.Equal(t, "Second workflow item", item["backup_title"])
|
||||
})
|
||||
|
||||
t.Run("11. Final state verification", func(t *testing.T) {
|
||||
// Verify final event count
|
||||
latestEvent, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 8, latestEvent.Seq)
|
||||
|
||||
// Verify all events are recorded
|
||||
allEvents, err := mockStore.GetEventsSince(0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, allEvents, 8)
|
||||
|
||||
// Verify final item state
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1)
|
||||
|
||||
finalItem := items[0]
|
||||
assert.Equal(t, "workflow_item_002", finalItem["id"])
|
||||
assert.Equal(t, "Second workflow item", finalItem["title"])
|
||||
assert.Equal(t, "Second workflow item", finalItem["backup_title"])
|
||||
assert.Nil(t, finalItem["content"]) // Moved to title
|
||||
|
||||
metadataStr, ok := finalItem["metadata"].(string)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, metadataStr, "system")
|
||||
assert.Contains(t, metadataStr, "1")
|
||||
})
|
||||
}
|
||||
|
||||
func TestErrorScenarios(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(*MockIntegrationEventStore)
|
||||
event Event
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Invalid JSON Patch operation",
|
||||
setup: func(m *MockIntegrationEventStore) {},
|
||||
event: Event{
|
||||
ItemID: "error_test_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "invalid_op",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Test operation failure",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
// Create item first
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
"content": "original",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "test",
|
||||
Path: "/content",
|
||||
Value: "different", // This should fail the test
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Remove non-existent field",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "remove",
|
||||
Path: "/nonexistent",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Replace non-existent field",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
tt.setup(mockStore)
|
||||
|
||||
_, err := mockStore.ProcessEvent(&tt.event)
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncScenarios(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
// Create some events
|
||||
events := []*Event{
|
||||
{
|
||||
ItemID: "sync_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "First item",
|
||||
},
|
||||
{
|
||||
ItemID: "sync_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Second item",
|
||||
},
|
||||
{
|
||||
ItemID: "sync_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated first item",
|
||||
},
|
||||
}
|
||||
|
||||
// Process all events
|
||||
for _, event := range events {
|
||||
_, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("Full sync from beginning", func(t *testing.T) {
|
||||
events, err := mockStore.GetEventsSince(0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 3)
|
||||
|
||||
// Verify event sequence
|
||||
for i, event := range events {
|
||||
assert.Equal(t, i+1, event.Seq)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Incremental sync", func(t *testing.T) {
|
||||
events, err := mockStore.GetEventsSince(1)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 2)
|
||||
|
||||
// Should get events 2 and 3
|
||||
assert.Equal(t, 2, events[0].Seq)
|
||||
assert.Equal(t, 3, events[1].Seq)
|
||||
})
|
||||
|
||||
t.Run("Sync validation", func(t *testing.T) {
|
||||
// Test valid sync
|
||||
mockStore.syncValid = true
|
||||
isValid, err := mockStore.ValidateSync(3, "hash_3")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, isValid)
|
||||
|
||||
// Test invalid sync
|
||||
mockStore.syncValid = false
|
||||
isValid, err = mockStore.ValidateSync(2, "wrong_hash")
|
||||
require.NoError(t, err)
|
||||
assert.False(t, isValid)
|
||||
})
|
||||
|
||||
t.Run("Get latest event", func(t *testing.T) {
|
||||
latest, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, latest.Seq)
|
||||
assert.Equal(t, "sync_item_001", latest.ItemID)
|
||||
assert.Equal(t, "replace", latest.Operation)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentOperations(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
// Simulate concurrent operations on different items
|
||||
t.Run("Multiple items concurrent creation", func(t *testing.T) {
|
||||
events := []*Event{
|
||||
{
|
||||
ItemID: "concurrent_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 1",
|
||||
},
|
||||
{
|
||||
ItemID: "concurrent_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 2",
|
||||
},
|
||||
{
|
||||
ItemID: "concurrent_003",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 3",
|
||||
},
|
||||
}
|
||||
|
||||
// Process events sequentially (simulating concurrent processing)
|
||||
var results []*Event
|
||||
for _, event := range events {
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
// Verify all events were processed with proper sequence numbers
|
||||
for i, result := range results {
|
||||
assert.Equal(t, i+1, result.Seq)
|
||||
}
|
||||
|
||||
// Verify all items exist
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 3)
|
||||
})
|
||||
|
||||
t.Run("Same item multiple operations", func(t *testing.T) {
|
||||
itemID := "concurrent_same_item"
|
||||
operations := []*Event{
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Initial content",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/priority",
|
||||
Value: "low",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/tags",
|
||||
Value: `["urgent", "important"]`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, op := range operations {
|
||||
_, err := mockStore.ProcessEvent(op)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify final state
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
|
||||
var targetItem map[string]interface{}
|
||||
for _, item := range items {
|
||||
if item["id"] == itemID {
|
||||
targetItem = item
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, targetItem)
|
||||
assert.Equal(t, "Initial content", targetItem["content"])
|
||||
assert.Equal(t, "high", targetItem["priority"])
|
||||
assert.NotNil(t, targetItem["tags"])
|
||||
})
|
||||
}
|
299
json_patch.go
Normal file
299
json_patch.go
Normal file
@@ -0,0 +1,299 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// JSONPatcher implements RFC6902 JSON Patch operations
|
||||
type JSONPatcher struct{}
|
||||
|
||||
// ApplyPatches applies a series of JSON Patch operations to a document
|
||||
func (jp *JSONPatcher) ApplyPatches(doc map[string]interface{}, patches []PatchOperation) (map[string]interface{}, error) {
|
||||
// Apply patches in-place to avoid memory duplication
|
||||
for i, patch := range patches {
|
||||
var err error
|
||||
doc, err = jp.applyPatch(doc, patch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply patch %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyPatch applies a single JSON Patch operation
|
||||
func (jp *JSONPatcher) applyPatch(doc map[string]interface{}, patch PatchOperation) (map[string]interface{}, error) {
|
||||
switch patch.Op {
|
||||
case "add":
|
||||
return jp.applyAdd(doc, patch.Path, patch.Value)
|
||||
case "remove":
|
||||
return jp.applyRemove(doc, patch.Path)
|
||||
case "replace":
|
||||
return jp.applyReplace(doc, patch.Path, patch.Value)
|
||||
case "move":
|
||||
return jp.applyMove(doc, patch.From, patch.Path)
|
||||
case "copy":
|
||||
return jp.applyCopy(doc, patch.From, patch.Path)
|
||||
case "test":
|
||||
return jp.applyTest(doc, patch.Path, patch.Value)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported operation: %s", patch.Op)
|
||||
}
|
||||
}
|
||||
|
||||
// applyAdd implements the "add" operation
|
||||
func (jp *JSONPatcher) applyAdd(doc map[string]interface{}, path string, value interface{}) (map[string]interface{}, error) {
|
||||
if path == "" {
|
||||
// Adding to root replaces entire document
|
||||
if newDoc, ok := value.(map[string]interface{}); ok {
|
||||
return newDoc, nil
|
||||
}
|
||||
return nil, fmt.Errorf("cannot replace root with non-object")
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
if len(parts) == 0 {
|
||||
return nil, fmt.Errorf("invalid path: %s", path)
|
||||
}
|
||||
|
||||
// Navigate to parent and add the value
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parent == nil {
|
||||
return nil, fmt.Errorf("parent does not exist for path: %s", path)
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
parentMap[key] = value
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
if key == "-" {
|
||||
// Append to end - need to modify the parent reference
|
||||
return nil, fmt.Errorf("array append operation not fully supported in this simplified implementation")
|
||||
} else {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
} else {
|
||||
// Insert at index - simplified implementation
|
||||
if index < 0 || index > len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
// For simplicity, we'll replace at index if it exists, or error if beyond bounds
|
||||
if index < len(parentSlice) {
|
||||
parentSlice[index] = value
|
||||
} else {
|
||||
return nil, fmt.Errorf("array insertion beyond bounds not supported in simplified implementation")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot add to non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyRemove implements the "remove" operation
|
||||
func (jp *JSONPatcher) applyRemove(doc map[string]interface{}, path string) (map[string]interface{}, error) {
|
||||
parts := jp.parsePath(path)
|
||||
if len(parts) == 0 {
|
||||
return nil, fmt.Errorf("cannot remove root")
|
||||
}
|
||||
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
if _, exists := parentMap[key]; !exists {
|
||||
return nil, fmt.Errorf("path does not exist: %s", path)
|
||||
}
|
||||
delete(parentMap, key)
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
if index < 0 || index >= len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
// Simplified remove - set to nil instead of actually removing
|
||||
parentSlice[index] = nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot remove from non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyReplace implements the "replace" operation
|
||||
func (jp *JSONPatcher) applyReplace(doc map[string]interface{}, path string, value interface{}) (map[string]interface{}, error) {
|
||||
if path == "" {
|
||||
// Replace entire document
|
||||
if newDoc, ok := value.(map[string]interface{}); ok {
|
||||
return newDoc, nil
|
||||
}
|
||||
return nil, fmt.Errorf("cannot replace root with non-object")
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
if _, exists := parentMap[key]; !exists {
|
||||
return nil, fmt.Errorf("path does not exist: %s", path)
|
||||
}
|
||||
parentMap[key] = value
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
if index < 0 || index >= len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
parentSlice[index] = value
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot replace in non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyMove implements the "move" operation
|
||||
func (jp *JSONPatcher) applyMove(doc map[string]interface{}, from, to string) (map[string]interface{}, error) {
|
||||
// Get value from source
|
||||
value, err := jp.getValue(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("move source not found: %w", err)
|
||||
}
|
||||
|
||||
// Remove from source
|
||||
doc, err = jp.applyRemove(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to remove from source: %w", err)
|
||||
}
|
||||
|
||||
// Add to destination
|
||||
doc, err = jp.applyAdd(doc, to, value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add to destination: %w", err)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyCopy implements the "copy" operation
|
||||
func (jp *JSONPatcher) applyCopy(doc map[string]interface{}, from, to string) (map[string]interface{}, error) {
|
||||
// Get value from source
|
||||
value, err := jp.getValue(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("copy source not found: %w", err)
|
||||
}
|
||||
|
||||
// Add to destination
|
||||
doc, err = jp.applyAdd(doc, to, value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add to destination: %w", err)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyTest implements the "test" operation
|
||||
func (jp *JSONPatcher) applyTest(doc map[string]interface{}, path string, expectedValue interface{}) (map[string]interface{}, error) {
|
||||
actualValue, err := jp.getValue(doc, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("test path not found: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(actualValue, expectedValue) {
|
||||
return nil, fmt.Errorf("test failed: expected %v, got %v", expectedValue, actualValue)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// getValue retrieves a value at the given JSON Pointer path
|
||||
func (jp *JSONPatcher) getValue(doc map[string]interface{}, path string) (interface{}, error) {
|
||||
if path == "" {
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
current := interface{}(doc)
|
||||
|
||||
for _, part := range parts {
|
||||
if currentMap, ok := current.(map[string]interface{}); ok {
|
||||
var exists bool
|
||||
current, exists = currentMap[part]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("path not found: %s", path)
|
||||
}
|
||||
} else if currentSlice, ok := current.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(part)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", part)
|
||||
}
|
||||
if index < 0 || index >= len(currentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
current = currentSlice[index]
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot navigate through non-object/non-array")
|
||||
}
|
||||
}
|
||||
|
||||
return current, nil
|
||||
}
|
||||
|
||||
// navigateToParent navigates to the parent of the target path
|
||||
func (jp *JSONPatcher) navigateToParent(doc map[string]interface{}, parts []string) (interface{}, string, error) {
|
||||
if len(parts) == 0 {
|
||||
return nil, "", fmt.Errorf("no parent for root")
|
||||
}
|
||||
|
||||
if len(parts) == 1 {
|
||||
return doc, parts[0], nil
|
||||
}
|
||||
|
||||
parentPath := "/" + strings.Join(parts[:len(parts)-1], "/")
|
||||
parent, err := jp.getValue(doc, parentPath)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return parent, parts[len(parts)-1], nil
|
||||
}
|
||||
|
||||
// parsePath parses a JSON Pointer path into parts
|
||||
func (jp *JSONPatcher) parsePath(path string) []string {
|
||||
if path == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(path, "/") {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
parts := strings.Split(path[1:], "/")
|
||||
|
||||
// Unescape JSON Pointer characters
|
||||
for i, part := range parts {
|
||||
part = strings.ReplaceAll(part, "~1", "/")
|
||||
part = strings.ReplaceAll(part, "~0", "~")
|
||||
parts[i] = part
|
||||
}
|
||||
|
||||
return parts
|
||||
}
|
573
json_patch_test.go
Normal file
573
json_patch_test.go
Normal file
@@ -0,0 +1,573 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJSONPatcher_Add(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "add simple field",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add nested field",
|
||||
doc: map[string]interface{}{"id": "test", "meta": map[string]interface{}{}},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/meta/priority",
|
||||
Value: "high",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add to existing field (overwrite)",
|
||||
doc: map[string]interface{}{"id": "test", "content": "old"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/content",
|
||||
Value: "new",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "new",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add array field",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/tags",
|
||||
Value: []interface{}{"tag1", "tag2"},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"tags": []interface{}{"tag1", "tag2"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add complex object",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/metadata",
|
||||
Value: map[string]interface{}{
|
||||
"created": "2025-01-01",
|
||||
"version": 1,
|
||||
},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"metadata": map[string]interface{}{
|
||||
"created": "2025-01-01",
|
||||
"version": 1,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Remove(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "remove simple field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove nested field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high", "tags": []string{"tag1"}},
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/meta/priority",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"tags": []string{"tag1"}},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/nonexistent",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Replace(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "replace simple field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "old content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/content",
|
||||
Value: "new content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "new content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "replace nested field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "low"},
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/meta/priority",
|
||||
Value: "high",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "replace non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Test(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "test field success",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test field failure",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/content",
|
||||
Value: "different content",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "test non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
_, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Move(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "move field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "move",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"title": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "move non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "move",
|
||||
From: "/nonexistent",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Copy(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "copy field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "copy",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
"title": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "copy non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "copy",
|
||||
From: "/nonexistent",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Complex(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patches []PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "multiple operations",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patches: []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: "test"},
|
||||
{Op: "add", Path: "/priority", Value: "high"},
|
||||
{Op: "replace", Path: "/content", Value: "updated test"},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "updated test",
|
||||
"priority": "high",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test then modify",
|
||||
doc: map[string]interface{}{"id": "test", "version": 1},
|
||||
patches: []PatchOperation{
|
||||
{Op: "test", Path: "/version", Value: 1},
|
||||
{Op: "replace", Path: "/version", Value: 2},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"version": 2,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "failed test stops execution",
|
||||
doc: map[string]interface{}{"id": "test", "version": 1},
|
||||
patches: []PatchOperation{
|
||||
{Op: "test", Path: "/version", Value: 2}, // This should fail
|
||||
{Op: "replace", Path: "/version", Value: 3},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"version": 1,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, tt.patches)
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_ParsePath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
path string
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "empty path",
|
||||
path: "",
|
||||
expected: []string{},
|
||||
},
|
||||
{
|
||||
name: "root path",
|
||||
path: "/",
|
||||
expected: []string{""},
|
||||
},
|
||||
{
|
||||
name: "simple path",
|
||||
path: "/content",
|
||||
expected: []string{"content"},
|
||||
},
|
||||
{
|
||||
name: "nested path",
|
||||
path: "/meta/priority",
|
||||
expected: []string{"meta", "priority"},
|
||||
},
|
||||
{
|
||||
name: "path with escaped characters",
|
||||
path: "/content~1title",
|
||||
expected: []string{"content/title"},
|
||||
},
|
||||
{
|
||||
name: "path with escaped slash",
|
||||
path: "/content~0title",
|
||||
expected: []string{"content~title"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result := patcher.parsePath(tt.path)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_InvalidOperations(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
patch PatchOperation
|
||||
}{
|
||||
{
|
||||
name: "invalid operation",
|
||||
patch: PatchOperation{
|
||||
Op: "invalid",
|
||||
Path: "/content",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid path format",
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "content", // missing leading slash
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
doc := map[string]interface{}{"id": "test"}
|
||||
_, err := patcher.ApplyPatches(doc, []PatchOperation{tt.patch})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
}
|
20
main.go
20
main.go
@@ -19,6 +19,26 @@ func main() {
|
||||
logger.Info("Starting server")
|
||||
app = pocketbase.New()
|
||||
|
||||
// Initialize event store
|
||||
eventStore := NewSimpleEventStore(app)
|
||||
|
||||
// Setup collections on startup
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// Setup database tables
|
||||
logger.Info("Setting up database tables...")
|
||||
if err := setupCollections(app); err != nil {
|
||||
logger.Error("Failed to setup database tables: %v", err)
|
||||
} else {
|
||||
logger.Info("Database tables setup complete")
|
||||
}
|
||||
|
||||
return se.Next()
|
||||
})
|
||||
|
||||
// Setup API routes
|
||||
setupAPIRoutes(app, eventStore)
|
||||
|
||||
// Serve static files
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// serves static files from the provided public dir (if exists)
|
||||
se.Router.GET("/{path...}", apis.Static(os.DirFS("./pb_public"), false))
|
||||
|
146
merging.go
Normal file
146
merging.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/pocketbase"
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// MergeEventLog merges old events by resolving them into current state
|
||||
// and creating new create events with resolved data
|
||||
func (es *SimpleEventStore) MergeEventLog(cutoffDays int) error {
|
||||
cutoffTime := time.Now().AddDate(0, 0, -cutoffDays)
|
||||
|
||||
// Get all events older than cutoff
|
||||
oldEvents, err := es.app.FindRecordsByFilter("events", "timestamp < {:cutoff}", "seq", 10000, 0, map[string]any{"cutoff": cutoffTime})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get old events: %w", err)
|
||||
}
|
||||
|
||||
if len(oldEvents) == 0 {
|
||||
return nil // Nothing to merge
|
||||
}
|
||||
|
||||
// Get all collections that have events
|
||||
collections := make(map[string]bool)
|
||||
for _, event := range oldEvents {
|
||||
collections[event.GetString("collection")] = true
|
||||
}
|
||||
|
||||
// Get latest event to preserve sequence continuity
|
||||
latestEvent, err := es.GetLatestEvent()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest event: %w", err)
|
||||
}
|
||||
|
||||
nextSeq := 1
|
||||
prevHash := ""
|
||||
if latestEvent != nil {
|
||||
nextSeq = latestEvent.Seq + 1
|
||||
prevHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
// For each collection, get current state and create consolidated create events
|
||||
for collectionName := range collections {
|
||||
items, err := es.GetAllItems(collectionName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get items for collection %s: %w", collectionName, err)
|
||||
}
|
||||
|
||||
// Create new create events for each existing item
|
||||
for _, item := range items {
|
||||
itemID, ok := item["id"].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Create consolidated create event using JSON Patch "add" operations
|
||||
patches := []PatchOperation{}
|
||||
for key, value := range item {
|
||||
if key != "id" && key != "created_at" && key != "updated_at" { // Skip system fields
|
||||
patches = append(patches, PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/" + key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Rethink merging for single operations
|
||||
consolidatedEvent := &Event{
|
||||
Seq: nextSeq,
|
||||
ItemID: itemID,
|
||||
Collection: collectionName,
|
||||
Operation: "add", // Placeholder - merging needs redesign
|
||||
Path: "/",
|
||||
Value: "consolidated",
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Generate new event ID and hash
|
||||
consolidatedEvent.EventID = generateEventID()
|
||||
consolidatedEvent.Hash = consolidatedEvent.calculateHash(prevHash)
|
||||
|
||||
// Save the consolidated event
|
||||
if err := es.saveEvent(consolidatedEvent); err != nil {
|
||||
return fmt.Errorf("failed to save consolidated event: %w", err)
|
||||
}
|
||||
|
||||
nextSeq++
|
||||
prevHash = consolidatedEvent.Hash
|
||||
}
|
||||
}
|
||||
|
||||
// Archive old events (save to backup file)
|
||||
if err := es.archiveEvents(oldEvents); err != nil {
|
||||
return fmt.Errorf("failed to archive old events: %w", err)
|
||||
}
|
||||
|
||||
// Delete old events
|
||||
for _, event := range oldEvents {
|
||||
if err := es.app.Delete(event); err != nil {
|
||||
return fmt.Errorf("failed to delete old event: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// archiveEvents saves old events to a backup file
|
||||
func (es *SimpleEventStore) archiveEvents(events []*core.Record) error {
|
||||
// For now, just log that we would archive
|
||||
// In a real implementation, you'd save to a file with timestamp
|
||||
fmt.Printf("Would archive %d events to backup file with timestamp %s\n",
|
||||
len(events), time.Now().Format("2006-01-02_15-04-05"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScheduleEventMerging sets up periodic event log merging
|
||||
func (es *SimpleEventStore) ScheduleEventMerging(app *pocketbase.PocketBase) {
|
||||
// This would typically use a job scheduler
|
||||
// For this basic implementation, we'll just provide the method
|
||||
// In production, you'd use something like cron or a job queue
|
||||
|
||||
// Example of how you might call it:
|
||||
// go func() {
|
||||
// ticker := time.NewTicker(24 * time.Hour)
|
||||
// defer ticker.Stop()
|
||||
// for {
|
||||
// select {
|
||||
// case <-ticker.C:
|
||||
// if err := es.MergeEventLog(2); err != nil {
|
||||
// // Log error
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
}
|
||||
|
||||
// generateEventID generates a new UUID for events
|
||||
func generateEventID() string {
|
||||
// Import uuid package at the top of file
|
||||
// For now, return a placeholder
|
||||
return "generated-uuid"
|
||||
}
|
125
migrations.go
Normal file
125
migrations.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
"github.com/pocketbase/pocketbase/tools/types"
|
||||
)
|
||||
|
||||
// setupCollections creates the required PocketBase collections for the event store
|
||||
func setupCollections(app core.App) error {
|
||||
fmt.Println("Creating PocketBase collections...")
|
||||
|
||||
// Create events collection if it doesn't exist
|
||||
if _, err := app.FindCollectionByNameOrId("events"); err != nil {
|
||||
fmt.Println("Creating events collection...")
|
||||
|
||||
eventsCollection := core.NewBaseCollection("events")
|
||||
|
||||
// Admin only access for events
|
||||
eventsCollection.ListRule = nil
|
||||
eventsCollection.ViewRule = nil
|
||||
eventsCollection.CreateRule = nil
|
||||
eventsCollection.UpdateRule = nil
|
||||
eventsCollection.DeleteRule = nil
|
||||
|
||||
// Add fields - ONE EVENT = ONE OPERATION
|
||||
eventsCollection.Fields.Add(&core.NumberField{
|
||||
Name: "seq",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "hash",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "item_id",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "event_id",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "collection",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "operation",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "path",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "value",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "from",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.DateField{
|
||||
Name: "timestamp",
|
||||
Required: true,
|
||||
})
|
||||
|
||||
// Add index on sequence number
|
||||
eventsCollection.AddIndex("idx_events_seq", false, "seq", "")
|
||||
|
||||
if err := app.Save(eventsCollection); err != nil {
|
||||
return fmt.Errorf("failed to create events collection: %w", err)
|
||||
}
|
||||
fmt.Println("✅ Events collection created successfully")
|
||||
} else {
|
||||
fmt.Println("Events collection already exists")
|
||||
}
|
||||
|
||||
// Create shopping_items collection if it doesn't exist
|
||||
if _, err := app.FindCollectionByNameOrId("shopping_items"); err != nil {
|
||||
fmt.Println("Creating shopping_items collection...")
|
||||
|
||||
itemsCollection := core.NewBaseCollection("shopping_items")
|
||||
|
||||
// Public access rules
|
||||
itemsCollection.ListRule = types.Pointer("")
|
||||
itemsCollection.ViewRule = types.Pointer("")
|
||||
itemsCollection.CreateRule = types.Pointer("")
|
||||
itemsCollection.UpdateRule = types.Pointer("")
|
||||
itemsCollection.DeleteRule = types.Pointer("")
|
||||
|
||||
// Add static fields only - no arbitrary fields allowed
|
||||
itemsCollection.Fields.Add(&core.TextField{
|
||||
Name: "content",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.TextField{
|
||||
Name: "priority",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "created_at",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "updated_at",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "deleted_at",
|
||||
Required: false,
|
||||
})
|
||||
|
||||
if err := app.Save(itemsCollection); err != nil {
|
||||
return fmt.Errorf("failed to create shopping_items collection: %w", err)
|
||||
}
|
||||
fmt.Println("✅ Shopping_items collection created successfully")
|
||||
} else {
|
||||
fmt.Println("Shopping_items collection already exists")
|
||||
}
|
||||
|
||||
fmt.Println("✅ PocketBase collections setup complete")
|
||||
return nil
|
||||
}
|
54
test_api.sh
Normal file
54
test_api.sh
Normal file
@@ -0,0 +1,54 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Simple API Tests - ONE OPERATION PER REQUEST
|
||||
echo "🚀 Testing Simple Event Store API..."
|
||||
|
||||
BASE_URL="http://localhost:8090"
|
||||
|
||||
echo "1. Check server health"
|
||||
curl -s "$BASE_URL/api/health"
|
||||
echo
|
||||
|
||||
echo "2. Get current state"
|
||||
curl -s "$BASE_URL/api/state"
|
||||
echo
|
||||
|
||||
echo "3. Create an item - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/content\", \"value\": \"My test item\"}"
|
||||
echo
|
||||
|
||||
echo "4. Update the item - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"replace\", \"path\": \"/content\", \"value\": \"Updated test item\"}"
|
||||
echo
|
||||
|
||||
echo "5. Get all items"
|
||||
curl -s "$BASE_URL/api/items/shopping_items"
|
||||
echo
|
||||
|
||||
echo "6. Add another field - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/priority\", \"value\": \"high\"}"
|
||||
echo
|
||||
|
||||
echo "7. Remove a field - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"remove\", \"path\": \"/priority\"}"
|
||||
echo
|
||||
|
||||
echo "8. Soft delete - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/deleted_at\", \"value\": \"2025-09-29T10:00:00Z\"}"
|
||||
echo
|
||||
|
||||
echo "9. Final state"
|
||||
curl -s "$BASE_URL/api/items/shopping_items"
|
||||
echo
|
||||
|
||||
echo "✅ Simple API tests completed!"
|
60
types.go
60
types.go
@@ -1,12 +1,16 @@
|
||||
package main
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/pocketbase"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
// Server generated sequence number of the event - ie when it was applied
|
||||
Seq int `json:"seq"`
|
||||
// Type of the event - create, update, delete, defined by the client
|
||||
Type string `json:"type"`
|
||||
// Hash of the event - server generated, gurantees the event was processed
|
||||
Hash string `json:"hash"`
|
||||
// ItemID of the item that is to be manipulated, defined by the client
|
||||
@@ -15,12 +19,23 @@ type Event struct {
|
||||
EventID string `json:"event_id"`
|
||||
// Collection of the item that is to be manipulated, defined by the client
|
||||
Collection string `json:"collection"`
|
||||
// Data that is to be used for manipulation; for create events that's the full objects and for update events that's the diff
|
||||
Data map[string]interface{} `json:"data"`
|
||||
// Single operation - ONE EVENT = ONE OPERATION
|
||||
Operation string `json:"operation"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
Value string `json:"value"` // Value as string (for add/replace operations)
|
||||
From string `json:"from"` // Source path for move/copy operations
|
||||
// Timestamp of the event - server generated, when the event was processed
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// PatchOperation represents a single RFC6902 JSON Patch operation (still needed for JSONPatcher)
|
||||
type PatchOperation struct {
|
||||
Op string `json:"op"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
Value interface{} `json:"value,omitempty"` // Value for add/replace operations
|
||||
From string `json:"from,omitempty"` // Source path for move/copy operations
|
||||
}
|
||||
|
||||
type SLItem struct {
|
||||
ID string
|
||||
Content string
|
||||
@@ -28,3 +43,38 @@ type SLItem struct {
|
||||
UpdatedAt time.Time
|
||||
DeletedAt time.Time
|
||||
}
|
||||
|
||||
type EventStore struct {
|
||||
app *pocketbase.PocketBase
|
||||
}
|
||||
|
||||
type SyncRequest struct {
|
||||
LastSeq int `json:"last_seq"`
|
||||
LastHash string `json:"last_hash"`
|
||||
}
|
||||
|
||||
type SyncResponse struct {
|
||||
Events []Event `json:"events"`
|
||||
CurrentSeq int `json:"current_seq"`
|
||||
CurrentHash string `json:"current_hash"`
|
||||
FullSync bool `json:"full_sync"`
|
||||
}
|
||||
|
||||
// Serialize event manually to ensure consistent field order for hashing
|
||||
func (e *Event) serialize() string {
|
||||
timestamp := e.Timestamp.Format(time.RFC3339Nano)
|
||||
|
||||
return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|operation:%s|path:%s|value:%s|from:%s|timestamp:%s",
|
||||
e.Seq, e.ItemID, e.EventID, e.Collection, e.Operation, e.Path, e.Value, e.From, timestamp)
|
||||
}
|
||||
|
||||
// Calculate hash of event plus previous hash
|
||||
func (e *Event) calculateHash(prevHash string) string {
|
||||
content := e.serialize() + "|prev_hash:" + prevHash
|
||||
hash := sha256.Sum256([]byte(content))
|
||||
return fmt.Sprintf("%x", hash)
|
||||
}
|
||||
|
||||
func NewEventStore(app *pocketbase.PocketBase) *EventStore {
|
||||
return &EventStore{app: app}
|
||||
}
|
||||
|
Reference in New Issue
Block a user