Compare commits
8 Commits
master
...
ff0d0b3bd3
Author | SHA1 | Date | |
---|---|---|---|
ff0d0b3bd3 | |||
6b7a519be9 | |||
607dd465a7 | |||
5f21d144c0 | |||
e9047ef2cb | |||
86c948beee | |||
7639706f5b | |||
c485f66476 |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*.exe
|
||||
pb_data
|
157
Spec.md
157
Spec.md
@@ -1,87 +1,103 @@
|
||||
# Event Log Based Store
|
||||
Event log based store
|
||||
|
||||
All data rows are reconstructed exclusively from an event log. All interactions with data rows must occur via events in the log. For performance, data rows are cached for quick lookup.
|
||||
The data rows of our table are to be recreated from an event log
|
||||
All interactions with the rows is to happen exclusively via events from/in the log
|
||||
For performance reasons we are to cache these data rows as well for quick lookup
|
||||
|
||||
Events are defined as:
|
||||
Events in the log are to take form of:
|
||||
type Event struct {
|
||||
Seq int `json:"seq"` // Server-generated sequence number (applied order)
|
||||
Hash string `json:"hash"` // Server-generated hash, guarantees event was processed
|
||||
ItemID string `json:"item_id"` // Client-defined item identifier
|
||||
EventID string `json:"event_id"` // Server-generated event identifier (uuid-v4)
|
||||
Collection string `json:"collection"` // Client-defined collection/table name
|
||||
Data string `json:"data"` // JSON array of RFC6902 patches
|
||||
Timestamp time.Time `json:"timestamp"` // Server-generated timestamp (when processed)
|
||||
// Server generated sequence number of the event - ie when it was applied
|
||||
Seq int `json:"seq"`
|
||||
// Type of the event - create, update, delete, defined by the client
|
||||
Type string `json:"type"`
|
||||
// Hash of the event - server generated, gurantees the event was processed
|
||||
Hash string `json:"hash"`
|
||||
// ItemID of the item that is to be manipulated, defined by the client
|
||||
ItemID string `json:"item_id"`
|
||||
// EventID of the event - server generated, gurantees the event was processed
|
||||
EventID string `json:"event_id"`
|
||||
// Collection of the item that is to be manipulated, defined by the client
|
||||
Collection string `json:"collection"`
|
||||
// Data that is to be used for manipulation; for create events that's the full objects and for update events that's the diff
|
||||
Data map[string]interface{} `json:"data"`
|
||||
// Timestamp of the event - server generated, when the event was processed
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
Events are divided into 3 types, create update and delete events
|
||||
Create events simply create the object as given in Data
|
||||
Delete events simply mark an object as deleted (not actually delete!) via its ItemID
|
||||
This simply means we set the DeletedAt field of the object to the current timestamp
|
||||
Updates to deleted events are processed as usual, we have no concept of a "deleted" item, only an item with a field set to a value
|
||||
Which is then filtered against when fetching and it so happens to be named "DeletedAt"
|
||||
Update events are to modify a field of a row and never more than one field
|
||||
Therefore its data is only the diff in the form of "age = 3"
|
||||
|
||||
When creating an event, only Data, Collection, and ItemID are required from the client. Hash, EventID, Seq, and Timestamp are computed server-side.
|
||||
When creating an event only the Type and ItemID must be provided
|
||||
Data is optional (delete events have no data)
|
||||
Hash, EventID and Seq are to be computed server side
|
||||
|
||||
Server-side event processing:
|
||||
- Retrieve the latest event for the collection.
|
||||
- Assign the next sequence number (incremented from the latest).
|
||||
- Generate a new EventID (uuid-v4).
|
||||
- Assign the current timestamp.
|
||||
- Compute the event hash as a function of the current event's data and the previous event's hash.
|
||||
- Serialize the event manually (not via json.Marshal or %+v) to ensure field order for hashing.
|
||||
- Apply the patch to the cached data row.
|
||||
On the server side with an incoming event:
|
||||
Grab the latest event
|
||||
Assign the event a sequence number that is incremented from the latest
|
||||
Create its EventID (generate a uuid-v4)
|
||||
Assign it a Timestamp
|
||||
Compute the hash from the dump of the current event PLUS the previous event's hash
|
||||
When serializing the event write the serialization function manually to ensure field order
|
||||
Do not use json serialize or %+v but manually string together the fields
|
||||
And only then apply the patch
|
||||
For create events that is insert objects
|
||||
For delete events that is mark objects as deleted
|
||||
For update events get the object, apply the diff and sav the object
|
||||
|
||||
Event log compaction:
|
||||
- Every 2 days, merge and compact the event log for each collection.
|
||||
- All events older than 2 days are resolved, and a new minimal event log is generated that produces the same state.
|
||||
- Sequence numbers (Seq) are never reset and always increment from the last value.
|
||||
- Before merging or deleting old events, save the original event log as a timestamped backup file.
|
||||
Events are to be periodically merged on the server
|
||||
Maybe set this cutoff to 2 or 3 days
|
||||
This means resolve all the events, delete the event log and generate an event log only having create events with the data we resolved
|
||||
Hopefully we will never have more than a few hundred of events
|
||||
Do NOT reset the seq number at any point, always increment from last
|
||||
|
||||
Client requirements:
|
||||
- Must be able to apply patches and fetch objects.
|
||||
- Must store:
|
||||
- last_seq: sequence number of the last processed event
|
||||
- last_hash: hash of the last processed event
|
||||
- events: local event log of all processed events
|
||||
- pending_events: locally generated events not yet sent to the server
|
||||
- On startup, fetch new events from the server since last_seq and apply them.
|
||||
- When modifying objects, generate events and append to pending_events.
|
||||
- Periodically or opportunistically send pending_events to the server.
|
||||
- Persist the event log (events and pending_events) locally.
|
||||
- If the server merges the event log, the client detects divergence by comparing last_seq and last_hash.
|
||||
- If sequence matches but hash differs, the server sends the full event log; the client reconstructs its state from this log.
|
||||
Maybe instead of deleting the event log save it somewhere just to have a backup
|
||||
Maybe cram them into a text file and save with timestamp
|
||||
Maybe don't delete/merge the whole event log but only "old" events like >2d
|
||||
While keeping the "new" events (<2d)
|
||||
|
||||
If the server merges the event log and the client has unsent local events:
|
||||
- Client fetches the merged events from the server.
|
||||
- Applies merged events to local state.
|
||||
- Reapplies unsent local events on top of the updated state.
|
||||
- Resends these events to the server.
|
||||
|
||||
If a client sends events after the event log has been merged:
|
||||
- The server accepts and applies these events as usual, regardless of the client's log state.
|
||||
|
||||
Merging the event log must not alter the resulting data state.
|
||||
|
||||
Required endpoints:
|
||||
|
||||
GET /api/<collection>/sync?last_seq=<last_seq>&last_hash=<last_hash>
|
||||
- Returns all events after the specified last_seq and last_hash.
|
||||
- If the provided seq and hash do not match the server's, returns the entire event log (client is out of sync).
|
||||
|
||||
PATCH /api/<collection>/events
|
||||
- Accepts a JSON array of RFC6902 patch objects.
|
||||
|
||||
Server processing:
|
||||
- As new events arrive, process the event log and update the cached state for the collection.
|
||||
- The current state is available for clients that do not wish to process the event log.
|
||||
- Only new events need to be applied to the current state; no need to reprocess the entire log each time.
|
||||
- Track the last event processed for each collection (sequence number and hash).
|
||||
|
||||
On startup, the server must:
|
||||
- Automatically create required collections: one for events and one for items (data state).
|
||||
- Events must be collection-agnostic and support any collection; at least one example collection is created at startup.
|
||||
- Ensure required columns exist in collections; if missing, reject PATCH requests with an error.
|
||||
- Each collection maintains its own sequence number, hash, and event log.
|
||||
On the client side we have to be able to apply patches and fetch objects
|
||||
The client is to keep a sequence number and hash of the last event it has processed
|
||||
When starting up ask the server for any new events since its last sequence number
|
||||
Get any new events and apply them to the local state
|
||||
When modifying objects generate events and append them to our local event log
|
||||
Periodically or when possible try to send those events to the server
|
||||
This means we have to keep saved the event log locally
|
||||
When the event log is merged on the server our local will diverge
|
||||
We will only know this by comparing the client hash and seq with the server hash and seq
|
||||
For example the client may have seq 127 and hash "abcd123" while the server, after merging, has seq 127 and hash "efgh456"
|
||||
Since on the server the seq 127 will have no previous event (merged - deleted)
|
||||
While on the client it will have some event
|
||||
At that point the server is to send the whole event log again and the client is to reconstruct it again
|
||||
|
||||
IF the server merged the event log and our client has events that have not yet been sent
|
||||
Then get the new events from server, apply them, and apply our local events on top of those
|
||||
And try to send them to server again
|
||||
On the server side if a client sends us events after we merged the event log
|
||||
We may simply naturally apply them even if the client was not operating on the merged event log
|
||||
At the end of the day merging the event log should make no changes to the data
|
||||
|
||||
---
|
||||
|
||||
|
||||
Actually for pocketbase we might want to generalize this
|
||||
Maybe create a "Collection" field as well and allow the events to manipulate any table...
|
||||
That way our Data isn't tied to a table...
|
||||
|
||||
|
||||
## RFC6902
|
||||
Wait actually we can use RFC6902
|
||||
"JSON patch standard"
|
||||
It defines a way to apply patches to JSON documents...
|
||||
Exactly what we need
|
||||
https://datatracker.ietf.org/doc/html/rfc6902
|
||||
|
||||
Some highlights:
|
||||
Operation objects MUST have exactly one "op" member, whose value
|
||||
indicates the operation to perform. Its value MUST be one of "add",
|
||||
"remove", "replace", "move", "copy", or "test"; other values are
|
||||
@@ -211,4 +227,9 @@ https://datatracker.ietf.org/doc/html/rfc6902
|
||||
|
||||
## test
|
||||
I think we don't care about this one
|
||||
|
||||
|
||||
|
||||
For this then use the PATCH http method
|
||||
And simply submit patches one by one
|
||||
"Patch" here is synonymous with our "event"
|
||||
Well nearly, a patch is part of an event
|
183
api.go
Normal file
183
api.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// setupAPIRoutes sets up the event store API routes
|
||||
func setupAPIRoutes(app core.App, eventStore *SimpleEventStore) {
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// JSON Patch endpoint using PATCH method - ONE OPERATION PER REQUEST
|
||||
se.Router.PATCH("/api/collections/{collection}/items/{itemId}", func(e *core.RequestEvent) error {
|
||||
collection := e.Request.PathValue("collection")
|
||||
itemID := e.Request.PathValue("itemId")
|
||||
|
||||
if collection == "" || itemID == "" {
|
||||
return e.BadRequestError("Collection and itemId are required", nil)
|
||||
}
|
||||
|
||||
var operation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value string `json:"value"`
|
||||
From string `json:"from"`
|
||||
}
|
||||
if err := e.BindBody(&operation); err != nil {
|
||||
return e.BadRequestError("Failed to parse operation data", err)
|
||||
}
|
||||
|
||||
// Create event with single operation
|
||||
incomingEvent := &Event{
|
||||
ItemID: itemID,
|
||||
Collection: collection,
|
||||
Operation: operation.Op,
|
||||
Path: operation.Path,
|
||||
Value: operation.Value,
|
||||
From: operation.From,
|
||||
}
|
||||
|
||||
// Process the event
|
||||
processedEvent, err := eventStore.ProcessEvent(incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, processedEvent)
|
||||
})
|
||||
|
||||
// Legacy POST endpoint for compatibility
|
||||
se.Router.POST("/api/events", func(e *core.RequestEvent) error {
|
||||
var incomingEvent Event
|
||||
if err := e.BindBody(&incomingEvent); err != nil {
|
||||
return e.BadRequestError("Failed to parse event data", err)
|
||||
}
|
||||
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
// Process the event
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusCreated, processedEvent)
|
||||
})
|
||||
|
||||
// Sync endpoint for clients
|
||||
se.Router.POST("/api/sync", func(e *core.RequestEvent) error {
|
||||
var syncReq SyncRequest
|
||||
if err := e.BindBody(&syncReq); err != nil {
|
||||
return e.BadRequestError("Failed to parse sync request", err)
|
||||
}
|
||||
|
||||
// Check if client is in sync
|
||||
isValid, err := eventStore.ValidateSync(syncReq.LastSeq, syncReq.LastHash)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to validate sync", err)
|
||||
}
|
||||
|
||||
var response SyncResponse
|
||||
|
||||
if !isValid {
|
||||
// Full sync needed - send all events
|
||||
events, err := eventStore.GetEventsSince(0)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get events", err)
|
||||
}
|
||||
response.Events = events
|
||||
response.FullSync = true
|
||||
} else {
|
||||
// Incremental sync - send events since last sequence
|
||||
events, err := eventStore.GetEventsSince(syncReq.LastSeq)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get events", err)
|
||||
}
|
||||
response.Events = events
|
||||
response.FullSync = false
|
||||
}
|
||||
|
||||
// Get current state
|
||||
latestEvent, err := eventStore.GetLatestEvent()
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get latest event", err)
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response.CurrentSeq = latestEvent.Seq
|
||||
response.CurrentHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, response)
|
||||
})
|
||||
|
||||
// Get all items endpoint
|
||||
se.Router.GET("/api/items/{collection}", func(e *core.RequestEvent) error {
|
||||
collection := e.Request.PathValue("collection")
|
||||
if collection == "" {
|
||||
return e.BadRequestError("Collection name required", nil)
|
||||
}
|
||||
|
||||
items, err := eventStore.GetAllItems(collection)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get items", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, map[string]interface{}{
|
||||
"items": items,
|
||||
})
|
||||
})
|
||||
|
||||
// Batch events endpoint for client to send multiple events
|
||||
se.Router.POST("/api/events/batch", func(e *core.RequestEvent) error {
|
||||
var events []Event
|
||||
if err := e.BindBody(&events); err != nil {
|
||||
return e.BadRequestError("Failed to parse events data", err)
|
||||
}
|
||||
|
||||
processedEvents := make([]Event, 0, len(events))
|
||||
for _, incomingEvent := range events {
|
||||
// Validate required fields
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
return e.BadRequestError("Missing required fields in event: item_id, collection, operation", nil)
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to process event", err)
|
||||
}
|
||||
processedEvents = append(processedEvents, *processedEvent)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusCreated, map[string]interface{}{
|
||||
"events": processedEvents,
|
||||
})
|
||||
})
|
||||
|
||||
// Get current state endpoint
|
||||
se.Router.GET("/api/state", func(e *core.RequestEvent) error {
|
||||
latestEvent, err := eventStore.GetLatestEvent()
|
||||
if err != nil {
|
||||
return e.InternalServerError("Failed to get latest event", err)
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"seq": 0,
|
||||
"hash": "",
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response["seq"] = latestEvent.Seq
|
||||
response["hash"] = latestEvent.Hash
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, response)
|
||||
})
|
||||
|
||||
return se.Next()
|
||||
})
|
||||
}
|
627
api_test.go
Normal file
627
api_test.go
Normal file
@@ -0,0 +1,627 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockSimpleEventStore for API testing
|
||||
type MockAPIEventStore struct {
|
||||
events []Event
|
||||
items map[string]map[string]interface{}
|
||||
nextSeq int
|
||||
}
|
||||
|
||||
func NewMockAPIEventStore() *MockAPIEventStore {
|
||||
return &MockAPIEventStore{
|
||||
events: []Event{},
|
||||
items: make(map[string]map[string]interface{}),
|
||||
nextSeq: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetLatestEvent() (*Event, error) {
|
||||
if len(m.events) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return &m.events[len(m.events)-1], nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
event := &Event{
|
||||
Seq: m.nextSeq,
|
||||
Hash: "mock_hash_" + string(rune(m.nextSeq+'0')),
|
||||
ItemID: incomingEvent.ItemID,
|
||||
EventID: "mock_event_" + string(rune(m.nextSeq+'0')),
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
m.events = append(m.events, *event)
|
||||
m.nextSeq++
|
||||
|
||||
// Apply operation to items for testing
|
||||
itemKey := incomingEvent.Collection + ":" + incomingEvent.ItemID
|
||||
if m.items[itemKey] == nil {
|
||||
m.items[itemKey] = map[string]interface{}{"id": incomingEvent.ItemID}
|
||||
}
|
||||
|
||||
// Simple mock application of JSON Patch operations
|
||||
switch incomingEvent.Operation {
|
||||
case "add", "replace":
|
||||
field := incomingEvent.Path[1:] // remove leading "/"
|
||||
m.items[itemKey][field] = incomingEvent.Value
|
||||
case "remove":
|
||||
field := incomingEvent.Path[1:]
|
||||
delete(m.items[itemKey], field)
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
for key, item := range m.items {
|
||||
if len(key) >= len(collection) && key[:len(collection)] == collection {
|
||||
// Skip soft-deleted items
|
||||
if deletedAt, exists := item["deleted_at"]; !exists || deletedAt == "" {
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (m *MockAPIEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
if len(m.events) == 0 {
|
||||
return seq == 0 && hash == "", nil
|
||||
}
|
||||
latest := m.events[len(m.events)-1]
|
||||
return latest.Seq == seq && latest.Hash == hash, nil
|
||||
}
|
||||
|
||||
func createTestRouter(eventStore *MockAPIEventStore) *http.ServeMux {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// PATCH endpoint for single operations
|
||||
mux.HandleFunc("PATCH /api/collections/{collection}/items/{itemId}", func(w http.ResponseWriter, r *http.Request) {
|
||||
collection := r.PathValue("collection")
|
||||
itemID := r.PathValue("itemId")
|
||||
|
||||
if collection == "" || itemID == "" {
|
||||
http.Error(w, "Collection and itemId are required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var operation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value string `json:"value"`
|
||||
From string `json:"from"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&operation); err != nil {
|
||||
http.Error(w, "Failed to parse operation data", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
incomingEvent := &Event{
|
||||
ItemID: itemID,
|
||||
Collection: collection,
|
||||
Operation: operation.Op,
|
||||
Path: operation.Path,
|
||||
Value: operation.Value,
|
||||
From: operation.From,
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(incomingEvent)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to process event", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(processedEvent)
|
||||
})
|
||||
|
||||
// POST endpoint for legacy events
|
||||
mux.HandleFunc("POST /api/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
var incomingEvent Event
|
||||
if err := json.NewDecoder(r.Body).Decode(&incomingEvent); err != nil {
|
||||
http.Error(w, "Failed to parse event data", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if incomingEvent.ItemID == "" || incomingEvent.Collection == "" || incomingEvent.Operation == "" {
|
||||
http.Error(w, "Missing required fields: item_id, collection, operation", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
processedEvent, err := eventStore.ProcessEvent(&incomingEvent)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to process event", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
json.NewEncoder(w).Encode(processedEvent)
|
||||
})
|
||||
|
||||
// Sync endpoint
|
||||
mux.HandleFunc("POST /api/sync", func(w http.ResponseWriter, r *http.Request) {
|
||||
var syncReq SyncRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&syncReq); err != nil {
|
||||
http.Error(w, "Failed to parse sync request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
isValid, err := eventStore.ValidateSync(syncReq.LastSeq, syncReq.LastHash)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to validate sync", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
events, err := eventStore.GetEventsSince(syncReq.LastSeq)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to get events", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
latestEvent, _ := eventStore.GetLatestEvent()
|
||||
|
||||
response := SyncResponse{
|
||||
Events: events,
|
||||
FullSync: !isValid,
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response.CurrentSeq = latestEvent.Seq
|
||||
response.CurrentHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// Get items endpoint
|
||||
mux.HandleFunc("GET /api/items/{collection}", func(w http.ResponseWriter, r *http.Request) {
|
||||
collection := r.PathValue("collection")
|
||||
|
||||
items, err := eventStore.GetAllItems(collection)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to get items", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]interface{}{"items": items}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// Health endpoint
|
||||
mux.HandleFunc("GET /api/health", func(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"message": "API is healthy.",
|
||||
"code": 200,
|
||||
"data": map[string]interface{}{},
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
// State endpoint
|
||||
mux.HandleFunc("GET /api/state", func(w http.ResponseWriter, r *http.Request) {
|
||||
latestEvent, _ := eventStore.GetLatestEvent()
|
||||
|
||||
response := map[string]interface{}{
|
||||
"seq": 0,
|
||||
"hash": "",
|
||||
}
|
||||
|
||||
if latestEvent != nil {
|
||||
response["seq"] = latestEvent.Seq
|
||||
response["hash"] = latestEvent.Hash
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
})
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_CreateItem(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
operation := map[string]string{
|
||||
"op": "add",
|
||||
"path": "/content",
|
||||
"value": "test item content",
|
||||
}
|
||||
body, _ := json.Marshal(operation)
|
||||
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "shopping_items", response.Collection)
|
||||
assert.Equal(t, "add", response.Operation)
|
||||
assert.Equal(t, "/content", response.Path)
|
||||
assert.Equal(t, "test item content", response.Value)
|
||||
assert.Equal(t, 1, response.Seq)
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_UpdateItem(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Create item first
|
||||
createOp := map[string]string{
|
||||
"op": "add",
|
||||
"path": "/content",
|
||||
"value": "original content",
|
||||
}
|
||||
body, _ := json.Marshal(createOp)
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
// Update item
|
||||
updateOp := map[string]string{
|
||||
"op": "replace",
|
||||
"path": "/content",
|
||||
"value": "updated content",
|
||||
}
|
||||
body, _ = json.Marshal(updateOp)
|
||||
req = httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w = httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "replace", response.Operation)
|
||||
assert.Equal(t, "/content", response.Path)
|
||||
assert.Equal(t, "updated content", response.Value)
|
||||
assert.Equal(t, 2, response.Seq)
|
||||
}
|
||||
|
||||
func TestAPI_PatchEndpoint_InvalidData(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
req := httptest.NewRequest("PATCH", "/api/collections/shopping_items/items/test123", bytes.NewReader([]byte("invalid json")))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
}
|
||||
|
||||
func TestAPI_PostEndpoint_LegacyEvent(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
event := Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
}
|
||||
body, _ := json.Marshal(event)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/events", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusCreated, w.Code)
|
||||
|
||||
var response Event
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", response.ItemID)
|
||||
assert.Equal(t, "add", response.Operation)
|
||||
}
|
||||
|
||||
func TestAPI_PostEndpoint_MissingFields(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
}{
|
||||
{
|
||||
name: "missing item_id",
|
||||
event: Event{
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing collection",
|
||||
event: Event{
|
||||
ItemID: "test123",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing operation",
|
||||
event: Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
body, _ := json.Marshal(tt.event)
|
||||
req := httptest.NewRequest("POST", "/api/events", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPI_SyncEndpoint_FullSync(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some events first
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
// Request sync from beginning
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: 0,
|
||||
LastHash: "",
|
||||
}
|
||||
body, _ := json.Marshal(syncReq)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/sync", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response SyncResponse
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, response.Events, 2)
|
||||
assert.Equal(t, 2, response.CurrentSeq)
|
||||
}
|
||||
|
||||
func TestAPI_SyncEndpoint_IncrementalSync(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some events first
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
// Request sync from seq 1
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: 1,
|
||||
LastHash: "mock_hash_1",
|
||||
}
|
||||
body, _ := json.Marshal(syncReq)
|
||||
|
||||
req := httptest.NewRequest("POST", "/api/sync", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response SyncResponse
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, response.Events, 1) // Only event 2
|
||||
assert.Equal(t, 2, response.CurrentSeq)
|
||||
}
|
||||
|
||||
func TestAPI_GetItemsEndpoint(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Add some items
|
||||
event1 := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event1)
|
||||
|
||||
event2 := &Event{
|
||||
ItemID: "item2",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 2",
|
||||
}
|
||||
mockStore.ProcessEvent(event2)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/items/shopping_items", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response map[string]interface{}
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
|
||||
items, ok := response["items"].([]interface{})
|
||||
require.True(t, ok)
|
||||
assert.Len(t, items, 2)
|
||||
}
|
||||
|
||||
func TestAPI_GetStateEndpoint(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
// Test empty state
|
||||
req := httptest.NewRequest("GET", "/api/state", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
var response map[string]interface{}
|
||||
err := json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, float64(0), response["seq"])
|
||||
assert.Equal(t, "", response["hash"])
|
||||
|
||||
// Add an event
|
||||
event := &Event{
|
||||
ItemID: "item1",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Item 1",
|
||||
}
|
||||
mockStore.ProcessEvent(event)
|
||||
|
||||
// Test state with event
|
||||
req = httptest.NewRequest("GET", "/api/state", nil)
|
||||
w = httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusOK, w.Code)
|
||||
|
||||
err = json.NewDecoder(w.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, float64(1), response["seq"])
|
||||
assert.NotEmpty(t, response["hash"])
|
||||
}
|
||||
|
||||
func TestAPI_PathValues(t *testing.T) {
|
||||
mockStore := NewMockAPIEventStore()
|
||||
router := createTestRouter(mockStore)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
url string
|
||||
method string
|
||||
expectedStatus int
|
||||
}{
|
||||
{
|
||||
name: "valid patch path",
|
||||
url: "/api/collections/shopping_items/items/test123",
|
||||
method: "PATCH",
|
||||
expectedStatus: http.StatusOK, // Valid operation should succeed
|
||||
},
|
||||
{
|
||||
name: "valid get items path",
|
||||
url: "/api/items/shopping_items",
|
||||
method: "GET",
|
||||
expectedStatus: http.StatusOK,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var req *http.Request
|
||||
if tt.method == "PATCH" {
|
||||
req = httptest.NewRequest(tt.method, tt.url, bytes.NewReader([]byte("{}")))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
} else {
|
||||
req = httptest.NewRequest(tt.method, tt.url, nil)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, tt.expectedStatus, w.Code)
|
||||
})
|
||||
}
|
||||
}
|
331
event_store_simple.go
Normal file
331
event_store_simple.go
Normal file
@@ -0,0 +1,331 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase"
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// SimpleEventStore uses direct SQL queries for better compatibility
|
||||
type SimpleEventStore struct {
|
||||
app *pocketbase.PocketBase
|
||||
}
|
||||
|
||||
func NewSimpleEventStore(app *pocketbase.PocketBase) *SimpleEventStore {
|
||||
return &SimpleEventStore{app: app}
|
||||
}
|
||||
|
||||
// GetLatestEvent returns the latest event from the event log
|
||||
func (es *SimpleEventStore) GetLatestEvent() (*Event, error) {
|
||||
records, err := es.app.FindRecordsByFilter("events", "", "-seq", 1, 0, map[string]any{})
|
||||
if err != nil || len(records) == 0 {
|
||||
return nil, nil // No events found
|
||||
}
|
||||
|
||||
record := records[0]
|
||||
event := &Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
event.Timestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// ProcessEvent processes an incoming event and applies it to the store
|
||||
func (es *SimpleEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
// Get latest event for sequence number and hash chaining
|
||||
latestEvent, err := es.GetLatestEvent()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get latest event: %w", err)
|
||||
}
|
||||
|
||||
// Prepare the event
|
||||
event := &Event{
|
||||
ItemID: incomingEvent.ItemID,
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
EventID: uuid.New().String(),
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Set sequence number
|
||||
if latestEvent == nil {
|
||||
event.Seq = 1
|
||||
} else {
|
||||
event.Seq = latestEvent.Seq + 1
|
||||
}
|
||||
|
||||
// Calculate hash
|
||||
prevHash := ""
|
||||
if latestEvent != nil {
|
||||
prevHash = latestEvent.Hash
|
||||
}
|
||||
event.Hash = event.calculateHash(prevHash)
|
||||
|
||||
// Save event to event log
|
||||
if err := es.saveEvent(event); err != nil {
|
||||
return nil, fmt.Errorf("failed to save event: %w", err)
|
||||
}
|
||||
|
||||
// Apply the event to cached data
|
||||
if err := es.applyEvent(event); err != nil {
|
||||
return nil, fmt.Errorf("failed to apply event: %w", err)
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// saveEvent saves an event to the events collection
|
||||
func (es *SimpleEventStore) saveEvent(event *Event) error {
|
||||
collection, err := es.app.FindCollectionByNameOrId("events")
|
||||
if err != nil {
|
||||
return fmt.Errorf("events collection not found: %w", err)
|
||||
}
|
||||
|
||||
record := core.NewRecord(collection)
|
||||
record.Set("seq", event.Seq)
|
||||
record.Set("hash", event.Hash)
|
||||
record.Set("item_id", event.ItemID)
|
||||
record.Set("event_id", event.EventID)
|
||||
record.Set("collection", event.Collection)
|
||||
record.Set("operation", event.Operation)
|
||||
record.Set("path", event.Path)
|
||||
record.Set("value", event.Value)
|
||||
record.Set("from", event.From)
|
||||
record.Set("timestamp", event.Timestamp.Format(time.RFC3339))
|
||||
|
||||
return es.app.Save(record)
|
||||
}
|
||||
|
||||
// applyEvent applies a single operation to the cached data
|
||||
func (es *SimpleEventStore) applyEvent(event *Event) error {
|
||||
// Get current document state
|
||||
currentDoc, err := es.getCurrentDocument(event.Collection, event.ItemID)
|
||||
if err != nil {
|
||||
// If document doesn't exist, create empty one
|
||||
currentDoc = map[string]interface{}{
|
||||
"id": event.ItemID,
|
||||
}
|
||||
}
|
||||
|
||||
// Apply single operation
|
||||
patcher := &JSONPatcher{}
|
||||
patches := []PatchOperation{{
|
||||
Op: event.Operation,
|
||||
Path: event.Path,
|
||||
Value: event.Value,
|
||||
From: event.From,
|
||||
}}
|
||||
|
||||
updatedDoc, err := patcher.ApplyPatches(currentDoc, patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply operation: %w", err)
|
||||
}
|
||||
|
||||
// Update timestamp
|
||||
updatedDoc["updated_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
|
||||
// Set created_at if this is a new document
|
||||
if _, exists := currentDoc["created_at"]; !exists {
|
||||
updatedDoc["created_at"] = event.Timestamp.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// Save the updated document
|
||||
return es.saveDocument(event.Collection, event.ItemID, updatedDoc)
|
||||
}
|
||||
|
||||
// getCurrentDocument retrieves the current state of a document
|
||||
func (es *SimpleEventStore) getCurrentDocument(collection, itemID string) (map[string]interface{}, error) {
|
||||
var result map[string]interface{}
|
||||
|
||||
rows, err := es.app.DB().
|
||||
Select("*").
|
||||
From(collection).
|
||||
Where(dbx.HashExp{"id": itemID}).
|
||||
Limit(1).
|
||||
Rows()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return nil, fmt.Errorf("document not found")
|
||||
}
|
||||
|
||||
// Get column names
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Scan values
|
||||
values := make([]interface{}, len(columns))
|
||||
valuePtrs := make([]interface{}, len(columns))
|
||||
for i := range values {
|
||||
valuePtrs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(valuePtrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create map
|
||||
result = make(map[string]interface{})
|
||||
for i, col := range columns {
|
||||
result[col] = values[i]
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// saveDocument saves a document to the collection
|
||||
func (es *SimpleEventStore) saveDocument(collectionName, itemID string, doc map[string]interface{}) error {
|
||||
collection, err := es.app.FindCollectionByNameOrId(collectionName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("collection %s not found: %w", collectionName, err)
|
||||
}
|
||||
|
||||
// Try to find existing record
|
||||
record, err := es.app.FindFirstRecordByFilter(collectionName, "id = {:id}", map[string]any{"id": itemID})
|
||||
if err != nil {
|
||||
// Record doesn't exist, create new one
|
||||
record = core.NewRecord(collection)
|
||||
record.Set("id", itemID)
|
||||
}
|
||||
|
||||
// Set all fields from doc
|
||||
for key, value := range doc {
|
||||
if key != "id" { // Don't overwrite the ID
|
||||
record.Set(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
return es.app.Save(record)
|
||||
}
|
||||
|
||||
// GetEventsSince returns events since the given sequence number
|
||||
func (es *SimpleEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
records, err := es.app.FindRecordsByFilter("events", "seq > {:seq}", "seq", 1000, 0, map[string]any{"seq": seq})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch events: %w", err)
|
||||
}
|
||||
|
||||
events := make([]Event, len(records))
|
||||
for i, record := range records {
|
||||
events[i] = Event{
|
||||
Seq: record.GetInt("seq"),
|
||||
Hash: record.GetString("hash"),
|
||||
ItemID: record.GetString("item_id"),
|
||||
EventID: record.GetString("event_id"),
|
||||
Collection: record.GetString("collection"),
|
||||
Operation: record.GetString("operation"),
|
||||
Path: record.GetString("path"),
|
||||
Value: record.GetString("value"),
|
||||
From: record.GetString("from"),
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
if timestampStr := record.GetString("timestamp"); timestampStr != "" {
|
||||
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
|
||||
events[i].Timestamp = timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// GetAllItems returns all non-deleted items from a collection
|
||||
func (es *SimpleEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
|
||||
rows, err := es.app.DB().
|
||||
Select("*").
|
||||
From(collection).
|
||||
Where(dbx.NewExp("deleted_at IS NULL OR deleted_at = ''")).
|
||||
OrderBy("created_at DESC").
|
||||
Rows()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch items: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Get column names
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
// Create slice to hold values
|
||||
values := make([]interface{}, len(columns))
|
||||
valuePtrs := make([]interface{}, len(columns))
|
||||
for i := range values {
|
||||
valuePtrs[i] = &values[i]
|
||||
}
|
||||
|
||||
// Scan into values
|
||||
if err := rows.Scan(valuePtrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create map from columns and values
|
||||
item := make(map[string]interface{})
|
||||
for i, col := range columns {
|
||||
item[col] = values[i]
|
||||
}
|
||||
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ValidateSync checks if client sync state matches server state
|
||||
func (es *SimpleEventStore) ValidateSync(clientSeq int, clientHash string) (bool, error) {
|
||||
if clientSeq == 0 {
|
||||
return false, nil // Full sync needed
|
||||
}
|
||||
|
||||
var hash string
|
||||
err := es.app.DB().
|
||||
Select("hash").
|
||||
From("events").
|
||||
Where(dbx.NewExp("seq = {:seq}", map[string]any{"seq": clientSeq})).
|
||||
Limit(1).
|
||||
One(&hash)
|
||||
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil // Event not found, full sync needed
|
||||
}
|
||||
return false, fmt.Errorf("failed to validate sync: %w", err)
|
||||
}
|
||||
|
||||
return hash == clientHash, nil
|
||||
}
|
407
event_store_test.go
Normal file
407
event_store_test.go
Normal file
@@ -0,0 +1,407 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEvent_Serialize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
}{
|
||||
{
|
||||
name: "simple event",
|
||||
event: Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
From: "",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "event with multiple fields",
|
||||
event: Event{
|
||||
Seq: 2,
|
||||
ItemID: "test456",
|
||||
EventID: "event-456",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
From: "/old_priority",
|
||||
Timestamp: time.Date(2025, 1, 2, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
serialized := tt.event.serialize()
|
||||
|
||||
// Check that all fields are present in serialized string
|
||||
assert.Contains(t, serialized, "seq:")
|
||||
assert.Contains(t, serialized, "item_id:")
|
||||
assert.Contains(t, serialized, "event_id:")
|
||||
assert.Contains(t, serialized, "collection:")
|
||||
assert.Contains(t, serialized, "operation:")
|
||||
assert.Contains(t, serialized, "path:")
|
||||
assert.Contains(t, serialized, "value:")
|
||||
assert.Contains(t, serialized, "from:")
|
||||
assert.Contains(t, serialized, "timestamp:")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvent_CalculateHash(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
event Event
|
||||
prevHash string
|
||||
}{
|
||||
{
|
||||
name: "first event (no previous hash)",
|
||||
event: Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
prevHash: "",
|
||||
},
|
||||
{
|
||||
name: "second event (with previous hash)",
|
||||
event: Event{
|
||||
Seq: 2,
|
||||
ItemID: "test123",
|
||||
EventID: "event-124",
|
||||
Collection: "items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "updated value",
|
||||
Timestamp: time.Date(2025, 1, 1, 1, 0, 0, 0, time.UTC),
|
||||
},
|
||||
prevHash: "previous_hash_123",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
hash := tt.event.calculateHash(tt.prevHash)
|
||||
|
||||
// Hash should not be empty
|
||||
assert.NotEmpty(t, hash)
|
||||
|
||||
// Hash should be deterministic
|
||||
hash2 := tt.event.calculateHash(tt.prevHash)
|
||||
assert.Equal(t, hash, hash2)
|
||||
|
||||
// Different previous hash should produce different result
|
||||
if tt.prevHash == "" {
|
||||
differentHash := tt.event.calculateHash("different")
|
||||
assert.NotEqual(t, hash, differentHash)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvent_CalculateHash_Consistency(t *testing.T) {
|
||||
event := Event{
|
||||
Seq: 1,
|
||||
ItemID: "test123",
|
||||
EventID: "event-123",
|
||||
Collection: "items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "test value",
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
}
|
||||
|
||||
// Same event should always produce same hash
|
||||
hash1 := event.calculateHash("prev")
|
||||
hash2 := event.calculateHash("prev")
|
||||
assert.Equal(t, hash1, hash2)
|
||||
|
||||
// Different previous hash should produce different hash
|
||||
hash3 := event.calculateHash("different_prev")
|
||||
assert.NotEqual(t, hash1, hash3)
|
||||
}
|
||||
|
||||
// Mock Event Store for testing
|
||||
type MockEventStore struct {
|
||||
events []Event
|
||||
latestEvent *Event
|
||||
processError error
|
||||
syncValid bool
|
||||
}
|
||||
|
||||
func (m *MockEventStore) ProcessEvent(event *Event) (*Event, error) {
|
||||
if m.processError != nil {
|
||||
return nil, m.processError
|
||||
}
|
||||
|
||||
processedEvent := &Event{
|
||||
Seq: len(m.events) + 1,
|
||||
Hash: "mock_hash",
|
||||
ItemID: event.ItemID,
|
||||
EventID: "mock_event_id",
|
||||
Collection: event.Collection,
|
||||
Operation: event.Operation,
|
||||
Path: event.Path,
|
||||
Value: event.Value,
|
||||
From: event.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
m.events = append(m.events, *processedEvent)
|
||||
m.latestEvent = processedEvent
|
||||
return processedEvent, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetLatestEvent() (*Event, error) {
|
||||
return m.latestEvent, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
return m.syncValid, nil
|
||||
}
|
||||
|
||||
func (m *MockEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
// Mock implementation
|
||||
items := []map[string]interface{}{
|
||||
{"id": "item1", "content": "test item 1"},
|
||||
{"id": "item2", "content": "test item 2"},
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func TestEventProcessing_CreateItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
event := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "My test item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "My test item", result.Value)
|
||||
assert.Equal(t, 1, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_UpdateItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
// Create item first
|
||||
createEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Original content",
|
||||
}
|
||||
_, err := mockStore.ProcessEvent(createEvent)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Update item
|
||||
updateEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated content",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(updateEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "replace", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "Updated content", result.Value)
|
||||
assert.Equal(t, 2, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_DeleteItem(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
// Create item first
|
||||
createEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Test content",
|
||||
}
|
||||
_, err := mockStore.ProcessEvent(createEvent)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Soft delete item
|
||||
deleteEvent := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/deleted_at",
|
||||
Value: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(deleteEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/deleted_at", result.Path)
|
||||
assert.NotEmpty(t, result.Value)
|
||||
assert.Equal(t, 2, result.Seq)
|
||||
}
|
||||
|
||||
func TestEventProcessing_ComplexOperations(t *testing.T) {
|
||||
mockStore := &MockEventStore{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
operation string
|
||||
path string
|
||||
value string
|
||||
from string
|
||||
}{
|
||||
{"add priority", "add", "/priority", "high", ""},
|
||||
{"replace content", "replace", "/content", "new content", ""},
|
||||
{"move field", "move", "/title", "", "/content"},
|
||||
{"copy field", "copy", "/backup_content", "", "/content"},
|
||||
{"remove field", "remove", "/priority", "", ""},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "test123",
|
||||
Collection: "shopping_items",
|
||||
Operation: tt.operation,
|
||||
Path: tt.path,
|
||||
Value: tt.value,
|
||||
From: tt.from,
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test123", result.ItemID)
|
||||
assert.Equal(t, tt.operation, result.Operation)
|
||||
assert.Equal(t, tt.path, result.Path)
|
||||
assert.Equal(t, i+1, result.Seq)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventSerialization(t *testing.T) {
|
||||
event := Event{
|
||||
Seq: 42,
|
||||
ItemID: "item-456",
|
||||
EventID: "event-789",
|
||||
Collection: "test_collection",
|
||||
Operation: "replace",
|
||||
Path: "/status",
|
||||
Value: "completed",
|
||||
From: "/old_status",
|
||||
Timestamp: time.Date(2025, 3, 15, 14, 30, 0, 0, time.UTC),
|
||||
}
|
||||
|
||||
serialized := event.serialize()
|
||||
|
||||
// Verify all components are present
|
||||
assert.Contains(t, serialized, "seq:42")
|
||||
assert.Contains(t, serialized, "item_id:item-456")
|
||||
assert.Contains(t, serialized, "event_id:event-789")
|
||||
assert.Contains(t, serialized, "collection:test_collection")
|
||||
assert.Contains(t, serialized, "operation:replace")
|
||||
assert.Contains(t, serialized, "path:/status")
|
||||
assert.Contains(t, serialized, "value:completed")
|
||||
assert.Contains(t, serialized, "from:/old_status")
|
||||
assert.Contains(t, serialized, "timestamp:2025-03-15T14:30:00Z")
|
||||
}
|
||||
|
||||
func TestPatchOperationSerialization(t *testing.T) {
|
||||
op := PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/test",
|
||||
Value: "test value",
|
||||
From: "/source",
|
||||
}
|
||||
|
||||
// Test that PatchOperation can be used in JSON operations
|
||||
assert.Equal(t, "add", op.Op)
|
||||
assert.Equal(t, "/test", op.Path)
|
||||
assert.Equal(t, "test value", op.Value)
|
||||
assert.Equal(t, "/source", op.From)
|
||||
}
|
||||
|
||||
func TestSyncRequest_Response(t *testing.T) {
|
||||
// Test SyncRequest
|
||||
req := SyncRequest{
|
||||
LastSeq: 10,
|
||||
LastHash: "abc123hash",
|
||||
}
|
||||
|
||||
assert.Equal(t, 10, req.LastSeq)
|
||||
assert.Equal(t, "abc123hash", req.LastHash)
|
||||
|
||||
// Test SyncResponse
|
||||
events := []Event{
|
||||
{
|
||||
Seq: 11,
|
||||
ItemID: "item1",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "new item",
|
||||
},
|
||||
{
|
||||
Seq: 12,
|
||||
ItemID: "item2",
|
||||
Operation: "replace",
|
||||
Path: "/status",
|
||||
Value: "updated",
|
||||
},
|
||||
}
|
||||
|
||||
resp := SyncResponse{
|
||||
Events: events,
|
||||
CurrentSeq: 12,
|
||||
CurrentHash: "def456hash",
|
||||
FullSync: false,
|
||||
}
|
||||
|
||||
assert.Len(t, resp.Events, 2)
|
||||
assert.Equal(t, 12, resp.CurrentSeq)
|
||||
assert.Equal(t, "def456hash", resp.CurrentHash)
|
||||
assert.False(t, resp.FullSync)
|
||||
|
||||
// Verify event details
|
||||
assert.Equal(t, "add", resp.Events[0].Operation)
|
||||
assert.Equal(t, "replace", resp.Events[1].Operation)
|
||||
}
|
269
example_client.go
Normal file
269
example_client.go
Normal file
@@ -0,0 +1,269 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ExampleClient demonstrates how to interact with the event store API
|
||||
type ExampleClient struct {
|
||||
baseURL string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewExampleClient(baseURL string) *ExampleClient {
|
||||
return &ExampleClient{
|
||||
baseURL: baseURL,
|
||||
client: &http.Client{Timeout: 10 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ExampleClient) CreateItem(itemID, content string) error {
|
||||
// Create item using JSON Patch "add" operations
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: content},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
func (c *ExampleClient) UpdateItem(itemID, content string) error {
|
||||
// Update item using JSON Patch "replace" operation
|
||||
patches := []PatchOperation{
|
||||
{Op: "replace", Path: "/content", Value: content},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
func (c *ExampleClient) DeleteItem(itemID string) error {
|
||||
// Delete item using JSON Patch "add" operation for deleted_at
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/deleted_at", Value: time.Now()},
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, patches)
|
||||
}
|
||||
|
||||
// sendPatch sends JSON Patch operations using the PATCH method
|
||||
func (c *ExampleClient) sendPatch(collection, itemID string, patches []PatchOperation) error {
|
||||
jsonData, err := json.Marshal(patches)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal patches: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/api/collections/%s/items/%s", c.baseURL, collection, itemID)
|
||||
req, err := http.NewRequest("PATCH", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send patch: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("patch failed: %s", string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendEvent sends event using the legacy POST method
|
||||
func (c *ExampleClient) sendEvent(event Event) error {
|
||||
jsonData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.client.Post(c.baseURL+"/api/events", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send event: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("event creation failed: %s", string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ExampleClient) SyncEvents(lastSeq int, lastHash string) (*SyncResponse, error) {
|
||||
syncReq := SyncRequest{
|
||||
LastSeq: lastSeq,
|
||||
LastHash: lastHash,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(syncReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal sync request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.client.Post(c.baseURL+"/api/sync", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to sync: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("sync failed: %s", string(body))
|
||||
}
|
||||
|
||||
var syncResp SyncResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&syncResp); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode sync response: %w", err)
|
||||
}
|
||||
|
||||
return &syncResp, nil
|
||||
}
|
||||
|
||||
func (c *ExampleClient) GetItems(collection string) ([]map[string]interface{}, error) {
|
||||
resp, err := c.client.Get(c.baseURL + "/api/items/" + collection)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get items: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get items failed: %s", string(body))
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
items, ok := result["items"].([]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid response format")
|
||||
}
|
||||
|
||||
var typedItems []map[string]interface{}
|
||||
for _, item := range items {
|
||||
if typedItem, ok := item.(map[string]interface{}); ok {
|
||||
typedItems = append(typedItems, typedItem)
|
||||
}
|
||||
}
|
||||
|
||||
return typedItems, nil
|
||||
}
|
||||
|
||||
// Advanced JSON Patch operations examples
|
||||
func (c *ExampleClient) AdvancedPatchExample(itemID string) error {
|
||||
// Example: Create item with multiple fields
|
||||
patches := []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: "Advanced Shopping Item"},
|
||||
{Op: "add", Path: "/quantity", Value: 5},
|
||||
{Op: "add", Path: "/tags", Value: []string{"grocery", "urgent"}},
|
||||
{Op: "add", Path: "/metadata", Value: map[string]interface{}{
|
||||
"store": "SuperMart",
|
||||
"category": "food",
|
||||
}},
|
||||
}
|
||||
|
||||
if err := c.sendPatch("shopping_items", itemID, patches); err != nil {
|
||||
return fmt.Errorf("failed to create advanced item: %w", err)
|
||||
}
|
||||
|
||||
// Example: Complex updates using different operations
|
||||
updatePatches := []PatchOperation{
|
||||
{Op: "replace", Path: "/quantity", Value: 3}, // Update quantity
|
||||
{Op: "add", Path: "/tags/-", Value: "sale"}, // Append to array
|
||||
{Op: "replace", Path: "/metadata/store", Value: "MegaMart"}, // Update nested field
|
||||
{Op: "add", Path: "/metadata/priority", Value: "high"}, // Add new nested field
|
||||
{Op: "remove", Path: "/tags/0"}, // Remove first tag
|
||||
}
|
||||
|
||||
return c.sendPatch("shopping_items", itemID, updatePatches)
|
||||
}
|
||||
|
||||
// Example usage function
|
||||
func runClientExample() {
|
||||
client := NewExampleClient("http://localhost:8090")
|
||||
|
||||
fmt.Println("=== RFC6902 JSON Patch Event Store Example ===")
|
||||
|
||||
// Create some shopping items using JSON Patch
|
||||
fmt.Println("Creating shopping items with JSON Patch...")
|
||||
if err := client.CreateItem("item1", "Milk"); err != nil {
|
||||
fmt.Printf("Error creating item1: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := client.CreateItem("item2", "Bread"); err != nil {
|
||||
fmt.Printf("Error creating item2: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Update an item using JSON Patch replace
|
||||
fmt.Println("Updating item1 with JSON Patch replace...")
|
||||
if err := client.UpdateItem("item1", "Organic Milk"); err != nil {
|
||||
fmt.Printf("Error updating item1: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Advanced JSON Patch operations
|
||||
fmt.Println("Demonstrating advanced JSON Patch operations...")
|
||||
if err := client.AdvancedPatchExample("item3"); err != nil {
|
||||
fmt.Printf("Error with advanced patch example: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get current items
|
||||
fmt.Println("Fetching current items...")
|
||||
items, err := client.GetItems("shopping_items")
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting items: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Current items: %+v\n", items)
|
||||
|
||||
// Sync events (simulate client sync)
|
||||
fmt.Println("Syncing events...")
|
||||
syncResp, err := client.SyncEvents(0, "")
|
||||
if err != nil {
|
||||
fmt.Printf("Error syncing: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Sync response - Found %d events\n", len(syncResp.Events))
|
||||
|
||||
// Soft delete an item using JSON Patch
|
||||
fmt.Println("Soft deleting item2 with JSON Patch...")
|
||||
if err := client.DeleteItem("item2"); err != nil {
|
||||
fmt.Printf("Error deleting item2: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get items again to see the change
|
||||
fmt.Println("Fetching items after deletion...")
|
||||
items, err = client.GetItems("shopping_items")
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting items: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Items after deletion: %+v\n", items)
|
||||
fmt.Println("=== JSON Patch Example completed ===")
|
||||
}
|
||||
|
||||
// Uncomment the following to run the example:
|
||||
// func init() {
|
||||
// go func() {
|
||||
// time.Sleep(2 * time.Second) // Wait for server to start
|
||||
// runClientExample()
|
||||
// }()
|
||||
// }
|
2
go.mod
2
go.mod
@@ -7,6 +7,7 @@ toolchain go1.24.7
|
||||
require (
|
||||
git.site.quack-lab.dev/dave/cylogger v1.4.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/pocketbase/dbx v1.11.0
|
||||
github.com/pocketbase/pocketbase v0.30.0
|
||||
github.com/stretchr/testify v1.4.0
|
||||
)
|
||||
@@ -29,7 +30,6 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/pocketbase/dbx v1.11.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/spf13/cast v1.9.2 // indirect
|
||||
github.com/spf13/cobra v1.10.1 // indirect
|
||||
|
606
integration_test.go
Normal file
606
integration_test.go
Normal file
@@ -0,0 +1,606 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockIntegrationEventStore for integration testing
|
||||
type MockIntegrationEventStore struct {
|
||||
events []Event
|
||||
items map[string]map[string]interface{}
|
||||
nextSeq int
|
||||
patcher *JSONPatcher
|
||||
syncValid bool
|
||||
forceError bool
|
||||
}
|
||||
|
||||
func NewMockIntegrationEventStore() *MockIntegrationEventStore {
|
||||
return &MockIntegrationEventStore{
|
||||
events: []Event{},
|
||||
items: make(map[string]map[string]interface{}),
|
||||
nextSeq: 1,
|
||||
patcher: &JSONPatcher{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetLatestEvent() (*Event, error) {
|
||||
if len(m.events) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return &m.events[len(m.events)-1], nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) ProcessEvent(incomingEvent *Event) (*Event, error) {
|
||||
if m.forceError {
|
||||
return nil, assert.AnError
|
||||
}
|
||||
|
||||
// Create processed event
|
||||
event := &Event{
|
||||
Seq: m.nextSeq,
|
||||
Hash: "hash_" + string(rune(m.nextSeq+'0')),
|
||||
ItemID: incomingEvent.ItemID,
|
||||
EventID: "event_" + string(rune(m.nextSeq+'0')),
|
||||
Collection: incomingEvent.Collection,
|
||||
Operation: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Apply event to items using JSON Patch
|
||||
itemKey := incomingEvent.Collection + ":" + incomingEvent.ItemID
|
||||
if m.items[itemKey] == nil {
|
||||
m.items[itemKey] = map[string]interface{}{
|
||||
"id": incomingEvent.ItemID,
|
||||
}
|
||||
}
|
||||
|
||||
// Create patch operation and apply it
|
||||
patch := PatchOperation{
|
||||
Op: incomingEvent.Operation,
|
||||
Path: incomingEvent.Path,
|
||||
Value: incomingEvent.Value,
|
||||
From: incomingEvent.From,
|
||||
}
|
||||
|
||||
updatedItem, err := m.patcher.ApplyPatches(m.items[itemKey], []PatchOperation{patch})
|
||||
if err != nil {
|
||||
return nil, err // Return the actual JSON patch error
|
||||
}
|
||||
m.items[itemKey] = updatedItem
|
||||
|
||||
m.events = append(m.events, *event)
|
||||
m.nextSeq++
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetEventsSince(seq int) ([]Event, error) {
|
||||
var events []Event
|
||||
for _, event := range m.events {
|
||||
if event.Seq > seq {
|
||||
events = append(events, event)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) GetAllItems(collection string) ([]map[string]interface{}, error) {
|
||||
var items []map[string]interface{}
|
||||
for key, item := range m.items {
|
||||
if len(key) >= len(collection) && key[:len(collection)] == collection {
|
||||
// Skip soft-deleted items
|
||||
if deletedAt, exists := item["deleted_at"]; !exists || deletedAt == "" {
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (m *MockIntegrationEventStore) ValidateSync(seq int, hash string) (bool, error) {
|
||||
if m.forceError {
|
||||
return false, assert.AnError
|
||||
}
|
||||
return m.syncValid, nil
|
||||
}
|
||||
|
||||
func TestFullWorkflow(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
t.Run("1. Create initial item using JSON Patch", func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Initial workflow item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, result.Seq)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/content", result.Path)
|
||||
assert.Equal(t, "Initial workflow item", result.Value)
|
||||
})
|
||||
|
||||
t.Run("2. Update item with multiple operations", func(t *testing.T) {
|
||||
// Add priority
|
||||
event1 := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
}
|
||||
result1, err := mockStore.ProcessEvent(event1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, result1.Seq)
|
||||
|
||||
// Update content
|
||||
event2 := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated workflow item",
|
||||
}
|
||||
result2, err := mockStore.ProcessEvent(event2)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, result2.Seq)
|
||||
})
|
||||
|
||||
t.Run("3. Create second item", func(t *testing.T) {
|
||||
event := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Second workflow item",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 4, result.Seq)
|
||||
})
|
||||
|
||||
t.Run("4. Get all items", func(t *testing.T) {
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 2)
|
||||
|
||||
// Find first item and verify its state
|
||||
var item1 map[string]interface{}
|
||||
for _, item := range items {
|
||||
if item["id"] == "workflow_item_001" {
|
||||
item1 = item
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, item1)
|
||||
assert.Equal(t, "Updated workflow item", item1["content"])
|
||||
assert.Equal(t, "high", item1["priority"])
|
||||
})
|
||||
|
||||
t.Run("5. Test client synchronization", func(t *testing.T) {
|
||||
// Test getting events since sequence 2
|
||||
events, err := mockStore.GetEventsSince(2)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 2) // events 3 and 4
|
||||
|
||||
// Verify latest event
|
||||
latestEvent, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 4, latestEvent.Seq)
|
||||
|
||||
// Test sync validation
|
||||
mockStore.syncValid = true
|
||||
isValid, err := mockStore.ValidateSync(4, "hash_4")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, isValid)
|
||||
})
|
||||
|
||||
t.Run("6. Soft delete an item", func(t *testing.T) {
|
||||
deleteEvent := &Event{
|
||||
ItemID: "workflow_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/deleted_at",
|
||||
Value: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(deleteEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, result.Seq)
|
||||
assert.Equal(t, "add", result.Operation)
|
||||
assert.Equal(t, "/deleted_at", result.Path)
|
||||
|
||||
// Verify item is now filtered out
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1) // Only one item remaining
|
||||
|
||||
// Verify remaining item is item_002
|
||||
assert.Equal(t, "workflow_item_002", items[0]["id"])
|
||||
})
|
||||
|
||||
t.Run("7. Complex nested operations", func(t *testing.T) {
|
||||
// Add metadata object
|
||||
metadataEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/metadata",
|
||||
Value: `{"created_by": "system", "version": 1}`,
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(metadataEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 6, result.Seq)
|
||||
|
||||
// Skip nested field update since we store JSON as strings
|
||||
// The metadata is stored as a JSON string, not a parsed object
|
||||
})
|
||||
|
||||
t.Run("8. Test operation on nested data", func(t *testing.T) {
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1)
|
||||
|
||||
item := items[0]
|
||||
assert.Equal(t, "workflow_item_002", item["id"])
|
||||
assert.Equal(t, "Second workflow item", item["content"])
|
||||
|
||||
// Check nested metadata (stored as JSON string)
|
||||
metadataStr, ok := item["metadata"].(string)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, metadataStr, "system")
|
||||
assert.Contains(t, metadataStr, "1") // Original value
|
||||
})
|
||||
|
||||
t.Run("9. Test move operation", func(t *testing.T) {
|
||||
moveEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "move",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(moveEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 7, result.Seq)
|
||||
|
||||
// Verify move operation
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
item := items[0]
|
||||
|
||||
assert.Equal(t, "Second workflow item", item["title"])
|
||||
assert.Nil(t, item["content"]) // Should be moved, not copied
|
||||
})
|
||||
|
||||
t.Run("10. Test copy operation", func(t *testing.T) {
|
||||
copyEvent := &Event{
|
||||
ItemID: "workflow_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "copy",
|
||||
From: "/title",
|
||||
Path: "/backup_title",
|
||||
}
|
||||
|
||||
result, err := mockStore.ProcessEvent(copyEvent)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 8, result.Seq)
|
||||
|
||||
// Verify copy operation
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
item := items[0]
|
||||
|
||||
assert.Equal(t, "Second workflow item", item["title"])
|
||||
assert.Equal(t, "Second workflow item", item["backup_title"])
|
||||
})
|
||||
|
||||
t.Run("11. Final state verification", func(t *testing.T) {
|
||||
// Verify final event count
|
||||
latestEvent, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 8, latestEvent.Seq)
|
||||
|
||||
// Verify all events are recorded
|
||||
allEvents, err := mockStore.GetEventsSince(0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, allEvents, 8)
|
||||
|
||||
// Verify final item state
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 1)
|
||||
|
||||
finalItem := items[0]
|
||||
assert.Equal(t, "workflow_item_002", finalItem["id"])
|
||||
assert.Equal(t, "Second workflow item", finalItem["title"])
|
||||
assert.Equal(t, "Second workflow item", finalItem["backup_title"])
|
||||
assert.Nil(t, finalItem["content"]) // Moved to title
|
||||
|
||||
metadataStr, ok := finalItem["metadata"].(string)
|
||||
require.True(t, ok)
|
||||
assert.Contains(t, metadataStr, "system")
|
||||
assert.Contains(t, metadataStr, "1")
|
||||
})
|
||||
}
|
||||
|
||||
func TestErrorScenarios(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(*MockIntegrationEventStore)
|
||||
event Event
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Invalid JSON Patch operation",
|
||||
setup: func(m *MockIntegrationEventStore) {},
|
||||
event: Event{
|
||||
ItemID: "error_test_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "invalid_op",
|
||||
Path: "/content",
|
||||
Value: "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Test operation failure",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
// Create item first
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
"content": "original",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "test",
|
||||
Path: "/content",
|
||||
Value: "different", // This should fail the test
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Remove non-existent field",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "remove",
|
||||
Path: "/nonexistent",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Replace non-existent field",
|
||||
setup: func(m *MockIntegrationEventStore) {
|
||||
m.items["shopping_items:test_item"] = map[string]interface{}{
|
||||
"id": "test_item",
|
||||
}
|
||||
},
|
||||
event: Event{
|
||||
ItemID: "test_item",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
tt.setup(mockStore)
|
||||
|
||||
_, err := mockStore.ProcessEvent(&tt.event)
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncScenarios(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
// Create some events
|
||||
events := []*Event{
|
||||
{
|
||||
ItemID: "sync_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "First item",
|
||||
},
|
||||
{
|
||||
ItemID: "sync_item_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Second item",
|
||||
},
|
||||
{
|
||||
ItemID: "sync_item_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/content",
|
||||
Value: "Updated first item",
|
||||
},
|
||||
}
|
||||
|
||||
// Process all events
|
||||
for _, event := range events {
|
||||
_, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("Full sync from beginning", func(t *testing.T) {
|
||||
events, err := mockStore.GetEventsSince(0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 3)
|
||||
|
||||
// Verify event sequence
|
||||
for i, event := range events {
|
||||
assert.Equal(t, i+1, event.Seq)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Incremental sync", func(t *testing.T) {
|
||||
events, err := mockStore.GetEventsSince(1)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 2)
|
||||
|
||||
// Should get events 2 and 3
|
||||
assert.Equal(t, 2, events[0].Seq)
|
||||
assert.Equal(t, 3, events[1].Seq)
|
||||
})
|
||||
|
||||
t.Run("Sync validation", func(t *testing.T) {
|
||||
// Test valid sync
|
||||
mockStore.syncValid = true
|
||||
isValid, err := mockStore.ValidateSync(3, "hash_3")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, isValid)
|
||||
|
||||
// Test invalid sync
|
||||
mockStore.syncValid = false
|
||||
isValid, err = mockStore.ValidateSync(2, "wrong_hash")
|
||||
require.NoError(t, err)
|
||||
assert.False(t, isValid)
|
||||
})
|
||||
|
||||
t.Run("Get latest event", func(t *testing.T) {
|
||||
latest, err := mockStore.GetLatestEvent()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, latest.Seq)
|
||||
assert.Equal(t, "sync_item_001", latest.ItemID)
|
||||
assert.Equal(t, "replace", latest.Operation)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentOperations(t *testing.T) {
|
||||
mockStore := NewMockIntegrationEventStore()
|
||||
|
||||
// Simulate concurrent operations on different items
|
||||
t.Run("Multiple items concurrent creation", func(t *testing.T) {
|
||||
events := []*Event{
|
||||
{
|
||||
ItemID: "concurrent_001",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 1",
|
||||
},
|
||||
{
|
||||
ItemID: "concurrent_002",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 2",
|
||||
},
|
||||
{
|
||||
ItemID: "concurrent_003",
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Concurrent item 3",
|
||||
},
|
||||
}
|
||||
|
||||
// Process events sequentially (simulating concurrent processing)
|
||||
var results []*Event
|
||||
for _, event := range events {
|
||||
result, err := mockStore.ProcessEvent(event)
|
||||
require.NoError(t, err)
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
// Verify all events were processed with proper sequence numbers
|
||||
for i, result := range results {
|
||||
assert.Equal(t, i+1, result.Seq)
|
||||
}
|
||||
|
||||
// Verify all items exist
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, items, 3)
|
||||
})
|
||||
|
||||
t.Run("Same item multiple operations", func(t *testing.T) {
|
||||
itemID := "concurrent_same_item"
|
||||
operations := []*Event{
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/content",
|
||||
Value: "Initial content",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/priority",
|
||||
Value: "low",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "replace",
|
||||
Path: "/priority",
|
||||
Value: "high",
|
||||
},
|
||||
{
|
||||
ItemID: itemID,
|
||||
Collection: "shopping_items",
|
||||
Operation: "add",
|
||||
Path: "/tags",
|
||||
Value: `["urgent", "important"]`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, op := range operations {
|
||||
_, err := mockStore.ProcessEvent(op)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify final state
|
||||
items, err := mockStore.GetAllItems("shopping_items")
|
||||
require.NoError(t, err)
|
||||
|
||||
var targetItem map[string]interface{}
|
||||
for _, item := range items {
|
||||
if item["id"] == itemID {
|
||||
targetItem = item
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, targetItem)
|
||||
assert.Equal(t, "Initial content", targetItem["content"])
|
||||
assert.Equal(t, "high", targetItem["priority"])
|
||||
assert.NotNil(t, targetItem["tags"])
|
||||
})
|
||||
}
|
299
json_patch.go
Normal file
299
json_patch.go
Normal file
@@ -0,0 +1,299 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// JSONPatcher implements RFC6902 JSON Patch operations
|
||||
type JSONPatcher struct{}
|
||||
|
||||
// ApplyPatches applies a series of JSON Patch operations to a document
|
||||
func (jp *JSONPatcher) ApplyPatches(doc map[string]interface{}, patches []PatchOperation) (map[string]interface{}, error) {
|
||||
// Apply patches in-place to avoid memory duplication
|
||||
for i, patch := range patches {
|
||||
var err error
|
||||
doc, err = jp.applyPatch(doc, patch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply patch %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyPatch applies a single JSON Patch operation
|
||||
func (jp *JSONPatcher) applyPatch(doc map[string]interface{}, patch PatchOperation) (map[string]interface{}, error) {
|
||||
switch patch.Op {
|
||||
case "add":
|
||||
return jp.applyAdd(doc, patch.Path, patch.Value)
|
||||
case "remove":
|
||||
return jp.applyRemove(doc, patch.Path)
|
||||
case "replace":
|
||||
return jp.applyReplace(doc, patch.Path, patch.Value)
|
||||
case "move":
|
||||
return jp.applyMove(doc, patch.From, patch.Path)
|
||||
case "copy":
|
||||
return jp.applyCopy(doc, patch.From, patch.Path)
|
||||
case "test":
|
||||
return jp.applyTest(doc, patch.Path, patch.Value)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported operation: %s", patch.Op)
|
||||
}
|
||||
}
|
||||
|
||||
// applyAdd implements the "add" operation
|
||||
func (jp *JSONPatcher) applyAdd(doc map[string]interface{}, path string, value interface{}) (map[string]interface{}, error) {
|
||||
if path == "" {
|
||||
// Adding to root replaces entire document
|
||||
if newDoc, ok := value.(map[string]interface{}); ok {
|
||||
return newDoc, nil
|
||||
}
|
||||
return nil, fmt.Errorf("cannot replace root with non-object")
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
if len(parts) == 0 {
|
||||
return nil, fmt.Errorf("invalid path: %s", path)
|
||||
}
|
||||
|
||||
// Navigate to parent and add the value
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parent == nil {
|
||||
return nil, fmt.Errorf("parent does not exist for path: %s", path)
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
parentMap[key] = value
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
if key == "-" {
|
||||
// Append to end - need to modify the parent reference
|
||||
return nil, fmt.Errorf("array append operation not fully supported in this simplified implementation")
|
||||
} else {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
} else {
|
||||
// Insert at index - simplified implementation
|
||||
if index < 0 || index > len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
// For simplicity, we'll replace at index if it exists, or error if beyond bounds
|
||||
if index < len(parentSlice) {
|
||||
parentSlice[index] = value
|
||||
} else {
|
||||
return nil, fmt.Errorf("array insertion beyond bounds not supported in simplified implementation")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot add to non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyRemove implements the "remove" operation
|
||||
func (jp *JSONPatcher) applyRemove(doc map[string]interface{}, path string) (map[string]interface{}, error) {
|
||||
parts := jp.parsePath(path)
|
||||
if len(parts) == 0 {
|
||||
return nil, fmt.Errorf("cannot remove root")
|
||||
}
|
||||
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
if _, exists := parentMap[key]; !exists {
|
||||
return nil, fmt.Errorf("path does not exist: %s", path)
|
||||
}
|
||||
delete(parentMap, key)
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
if index < 0 || index >= len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
// Simplified remove - set to nil instead of actually removing
|
||||
parentSlice[index] = nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot remove from non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyReplace implements the "replace" operation
|
||||
func (jp *JSONPatcher) applyReplace(doc map[string]interface{}, path string, value interface{}) (map[string]interface{}, error) {
|
||||
if path == "" {
|
||||
// Replace entire document
|
||||
if newDoc, ok := value.(map[string]interface{}); ok {
|
||||
return newDoc, nil
|
||||
}
|
||||
return nil, fmt.Errorf("cannot replace root with non-object")
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
parent, key, err := jp.navigateToParent(doc, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if parentMap, ok := parent.(map[string]interface{}); ok {
|
||||
if _, exists := parentMap[key]; !exists {
|
||||
return nil, fmt.Errorf("path does not exist: %s", path)
|
||||
}
|
||||
parentMap[key] = value
|
||||
} else if parentSlice, ok := parent.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", key)
|
||||
}
|
||||
if index < 0 || index >= len(parentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
parentSlice[index] = value
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot replace in non-object/non-array")
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyMove implements the "move" operation
|
||||
func (jp *JSONPatcher) applyMove(doc map[string]interface{}, from, to string) (map[string]interface{}, error) {
|
||||
// Get value from source
|
||||
value, err := jp.getValue(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("move source not found: %w", err)
|
||||
}
|
||||
|
||||
// Remove from source
|
||||
doc, err = jp.applyRemove(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to remove from source: %w", err)
|
||||
}
|
||||
|
||||
// Add to destination
|
||||
doc, err = jp.applyAdd(doc, to, value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add to destination: %w", err)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyCopy implements the "copy" operation
|
||||
func (jp *JSONPatcher) applyCopy(doc map[string]interface{}, from, to string) (map[string]interface{}, error) {
|
||||
// Get value from source
|
||||
value, err := jp.getValue(doc, from)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("copy source not found: %w", err)
|
||||
}
|
||||
|
||||
// Add to destination
|
||||
doc, err = jp.applyAdd(doc, to, value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add to destination: %w", err)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// applyTest implements the "test" operation
|
||||
func (jp *JSONPatcher) applyTest(doc map[string]interface{}, path string, expectedValue interface{}) (map[string]interface{}, error) {
|
||||
actualValue, err := jp.getValue(doc, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("test path not found: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(actualValue, expectedValue) {
|
||||
return nil, fmt.Errorf("test failed: expected %v, got %v", expectedValue, actualValue)
|
||||
}
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
// getValue retrieves a value at the given JSON Pointer path
|
||||
func (jp *JSONPatcher) getValue(doc map[string]interface{}, path string) (interface{}, error) {
|
||||
if path == "" {
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
parts := jp.parsePath(path)
|
||||
current := interface{}(doc)
|
||||
|
||||
for _, part := range parts {
|
||||
if currentMap, ok := current.(map[string]interface{}); ok {
|
||||
var exists bool
|
||||
current, exists = currentMap[part]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("path not found: %s", path)
|
||||
}
|
||||
} else if currentSlice, ok := current.([]interface{}); ok {
|
||||
index, err := strconv.Atoi(part)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid array index: %s", part)
|
||||
}
|
||||
if index < 0 || index >= len(currentSlice) {
|
||||
return nil, fmt.Errorf("array index out of bounds: %d", index)
|
||||
}
|
||||
current = currentSlice[index]
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot navigate through non-object/non-array")
|
||||
}
|
||||
}
|
||||
|
||||
return current, nil
|
||||
}
|
||||
|
||||
// navigateToParent navigates to the parent of the target path
|
||||
func (jp *JSONPatcher) navigateToParent(doc map[string]interface{}, parts []string) (interface{}, string, error) {
|
||||
if len(parts) == 0 {
|
||||
return nil, "", fmt.Errorf("no parent for root")
|
||||
}
|
||||
|
||||
if len(parts) == 1 {
|
||||
return doc, parts[0], nil
|
||||
}
|
||||
|
||||
parentPath := "/" + strings.Join(parts[:len(parts)-1], "/")
|
||||
parent, err := jp.getValue(doc, parentPath)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return parent, parts[len(parts)-1], nil
|
||||
}
|
||||
|
||||
// parsePath parses a JSON Pointer path into parts
|
||||
func (jp *JSONPatcher) parsePath(path string) []string {
|
||||
if path == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(path, "/") {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
parts := strings.Split(path[1:], "/")
|
||||
|
||||
// Unescape JSON Pointer characters
|
||||
for i, part := range parts {
|
||||
part = strings.ReplaceAll(part, "~1", "/")
|
||||
part = strings.ReplaceAll(part, "~0", "~")
|
||||
parts[i] = part
|
||||
}
|
||||
|
||||
return parts
|
||||
}
|
573
json_patch_test.go
Normal file
573
json_patch_test.go
Normal file
@@ -0,0 +1,573 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJSONPatcher_Add(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "add simple field",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add nested field",
|
||||
doc: map[string]interface{}{"id": "test", "meta": map[string]interface{}{}},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/meta/priority",
|
||||
Value: "high",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add to existing field (overwrite)",
|
||||
doc: map[string]interface{}{"id": "test", "content": "old"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/content",
|
||||
Value: "new",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "new",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add array field",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/tags",
|
||||
Value: []interface{}{"tag1", "tag2"},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"tags": []interface{}{"tag1", "tag2"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add complex object",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/metadata",
|
||||
Value: map[string]interface{}{
|
||||
"created": "2025-01-01",
|
||||
"version": 1,
|
||||
},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"metadata": map[string]interface{}{
|
||||
"created": "2025-01-01",
|
||||
"version": 1,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Remove(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "remove simple field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove nested field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high", "tags": []string{"tag1"}},
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/meta/priority",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"tags": []string{"tag1"}},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "remove",
|
||||
Path: "/nonexistent",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Replace(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "replace simple field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "old content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/content",
|
||||
Value: "new content",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "new content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "replace nested field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "low"},
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/meta/priority",
|
||||
Value: "high",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"meta": map[string]interface{}{"priority": "high"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "replace non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "replace",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Test(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "test field success",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/content",
|
||||
Value: "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test field failure",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/content",
|
||||
Value: "different content",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "test non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "test",
|
||||
Path: "/nonexistent",
|
||||
Value: "value",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
_, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Move(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "move field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "move",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"title": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "move non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "move",
|
||||
From: "/nonexistent",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Copy(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patch PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "copy field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "copy",
|
||||
From: "/content",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "test content",
|
||||
"title": "test content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "copy non-existent field",
|
||||
doc: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
patch: PatchOperation{
|
||||
Op: "copy",
|
||||
From: "/nonexistent",
|
||||
Path: "/title",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, []PatchOperation{tt.patch})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_Complex(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
doc map[string]interface{}
|
||||
patches []PatchOperation
|
||||
expected map[string]interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "multiple operations",
|
||||
doc: map[string]interface{}{"id": "test"},
|
||||
patches: []PatchOperation{
|
||||
{Op: "add", Path: "/content", Value: "test"},
|
||||
{Op: "add", Path: "/priority", Value: "high"},
|
||||
{Op: "replace", Path: "/content", Value: "updated test"},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"content": "updated test",
|
||||
"priority": "high",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test then modify",
|
||||
doc: map[string]interface{}{"id": "test", "version": 1},
|
||||
patches: []PatchOperation{
|
||||
{Op: "test", Path: "/version", Value: 1},
|
||||
{Op: "replace", Path: "/version", Value: 2},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"version": 2,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "failed test stops execution",
|
||||
doc: map[string]interface{}{"id": "test", "version": 1},
|
||||
patches: []PatchOperation{
|
||||
{Op: "test", Path: "/version", Value: 2}, // This should fail
|
||||
{Op: "replace", Path: "/version", Value: 3},
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"id": "test",
|
||||
"version": 1,
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result, err := patcher.ApplyPatches(tt.doc, tt.patches)
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_ParsePath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
path string
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "empty path",
|
||||
path: "",
|
||||
expected: []string{},
|
||||
},
|
||||
{
|
||||
name: "root path",
|
||||
path: "/",
|
||||
expected: []string{""},
|
||||
},
|
||||
{
|
||||
name: "simple path",
|
||||
path: "/content",
|
||||
expected: []string{"content"},
|
||||
},
|
||||
{
|
||||
name: "nested path",
|
||||
path: "/meta/priority",
|
||||
expected: []string{"meta", "priority"},
|
||||
},
|
||||
{
|
||||
name: "path with escaped characters",
|
||||
path: "/content~1title",
|
||||
expected: []string{"content/title"},
|
||||
},
|
||||
{
|
||||
name: "path with escaped slash",
|
||||
path: "/content~0title",
|
||||
expected: []string{"content~title"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
result := patcher.parsePath(tt.path)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONPatcher_InvalidOperations(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
patch PatchOperation
|
||||
}{
|
||||
{
|
||||
name: "invalid operation",
|
||||
patch: PatchOperation{
|
||||
Op: "invalid",
|
||||
Path: "/content",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid path format",
|
||||
patch: PatchOperation{
|
||||
Op: "add",
|
||||
Path: "content", // missing leading slash
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
patcher := &JSONPatcher{}
|
||||
doc := map[string]interface{}{"id": "test"}
|
||||
_, err := patcher.ApplyPatches(doc, []PatchOperation{tt.patch})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
}
|
20
main.go
20
main.go
@@ -19,6 +19,26 @@ func main() {
|
||||
logger.Info("Starting server")
|
||||
app = pocketbase.New()
|
||||
|
||||
// Initialize event store
|
||||
eventStore := NewSimpleEventStore(app)
|
||||
|
||||
// Setup collections on startup
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// Setup database tables
|
||||
logger.Info("Setting up database tables...")
|
||||
if err := setupCollections(app); err != nil {
|
||||
logger.Error("Failed to setup database tables: %v", err)
|
||||
} else {
|
||||
logger.Info("Database tables setup complete")
|
||||
}
|
||||
|
||||
return se.Next()
|
||||
})
|
||||
|
||||
// Setup API routes
|
||||
setupAPIRoutes(app, eventStore)
|
||||
|
||||
// Serve static files
|
||||
app.OnServe().BindFunc(func(se *core.ServeEvent) error {
|
||||
// serves static files from the provided public dir (if exists)
|
||||
se.Router.GET("/{path...}", apis.Static(os.DirFS("./pb_public"), false))
|
||||
|
146
merging.go
Normal file
146
merging.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/pocketbase"
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// MergeEventLog merges old events by resolving them into current state
|
||||
// and creating new create events with resolved data
|
||||
func (es *SimpleEventStore) MergeEventLog(cutoffDays int) error {
|
||||
cutoffTime := time.Now().AddDate(0, 0, -cutoffDays)
|
||||
|
||||
// Get all events older than cutoff
|
||||
oldEvents, err := es.app.FindRecordsByFilter("events", "timestamp < {:cutoff}", "seq", 10000, 0, map[string]any{"cutoff": cutoffTime})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get old events: %w", err)
|
||||
}
|
||||
|
||||
if len(oldEvents) == 0 {
|
||||
return nil // Nothing to merge
|
||||
}
|
||||
|
||||
// Get all collections that have events
|
||||
collections := make(map[string]bool)
|
||||
for _, event := range oldEvents {
|
||||
collections[event.GetString("collection")] = true
|
||||
}
|
||||
|
||||
// Get latest event to preserve sequence continuity
|
||||
latestEvent, err := es.GetLatestEvent()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest event: %w", err)
|
||||
}
|
||||
|
||||
nextSeq := 1
|
||||
prevHash := ""
|
||||
if latestEvent != nil {
|
||||
nextSeq = latestEvent.Seq + 1
|
||||
prevHash = latestEvent.Hash
|
||||
}
|
||||
|
||||
// For each collection, get current state and create consolidated create events
|
||||
for collectionName := range collections {
|
||||
items, err := es.GetAllItems(collectionName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get items for collection %s: %w", collectionName, err)
|
||||
}
|
||||
|
||||
// Create new create events for each existing item
|
||||
for _, item := range items {
|
||||
itemID, ok := item["id"].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Create consolidated create event using JSON Patch "add" operations
|
||||
patches := []PatchOperation{}
|
||||
for key, value := range item {
|
||||
if key != "id" && key != "created_at" && key != "updated_at" { // Skip system fields
|
||||
patches = append(patches, PatchOperation{
|
||||
Op: "add",
|
||||
Path: "/" + key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Rethink merging for single operations
|
||||
consolidatedEvent := &Event{
|
||||
Seq: nextSeq,
|
||||
ItemID: itemID,
|
||||
Collection: collectionName,
|
||||
Operation: "add", // Placeholder - merging needs redesign
|
||||
Path: "/",
|
||||
Value: "consolidated",
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Generate new event ID and hash
|
||||
consolidatedEvent.EventID = generateEventID()
|
||||
consolidatedEvent.Hash = consolidatedEvent.calculateHash(prevHash)
|
||||
|
||||
// Save the consolidated event
|
||||
if err := es.saveEvent(consolidatedEvent); err != nil {
|
||||
return fmt.Errorf("failed to save consolidated event: %w", err)
|
||||
}
|
||||
|
||||
nextSeq++
|
||||
prevHash = consolidatedEvent.Hash
|
||||
}
|
||||
}
|
||||
|
||||
// Archive old events (save to backup file)
|
||||
if err := es.archiveEvents(oldEvents); err != nil {
|
||||
return fmt.Errorf("failed to archive old events: %w", err)
|
||||
}
|
||||
|
||||
// Delete old events
|
||||
for _, event := range oldEvents {
|
||||
if err := es.app.Delete(event); err != nil {
|
||||
return fmt.Errorf("failed to delete old event: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// archiveEvents saves old events to a backup file
|
||||
func (es *SimpleEventStore) archiveEvents(events []*core.Record) error {
|
||||
// For now, just log that we would archive
|
||||
// In a real implementation, you'd save to a file with timestamp
|
||||
fmt.Printf("Would archive %d events to backup file with timestamp %s\n",
|
||||
len(events), time.Now().Format("2006-01-02_15-04-05"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScheduleEventMerging sets up periodic event log merging
|
||||
func (es *SimpleEventStore) ScheduleEventMerging(app *pocketbase.PocketBase) {
|
||||
// This would typically use a job scheduler
|
||||
// For this basic implementation, we'll just provide the method
|
||||
// In production, you'd use something like cron or a job queue
|
||||
|
||||
// Example of how you might call it:
|
||||
// go func() {
|
||||
// ticker := time.NewTicker(24 * time.Hour)
|
||||
// defer ticker.Stop()
|
||||
// for {
|
||||
// select {
|
||||
// case <-ticker.C:
|
||||
// if err := es.MergeEventLog(2); err != nil {
|
||||
// // Log error
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
}
|
||||
|
||||
// generateEventID generates a new UUID for events
|
||||
func generateEventID() string {
|
||||
// Import uuid package at the top of file
|
||||
// For now, return a placeholder
|
||||
return "generated-uuid"
|
||||
}
|
125
migrations.go
Normal file
125
migrations.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
"github.com/pocketbase/pocketbase/tools/types"
|
||||
)
|
||||
|
||||
// setupCollections creates the required PocketBase collections for the event store
|
||||
func setupCollections(app core.App) error {
|
||||
fmt.Println("Creating PocketBase collections...")
|
||||
|
||||
// Create events collection if it doesn't exist
|
||||
if _, err := app.FindCollectionByNameOrId("events"); err != nil {
|
||||
fmt.Println("Creating events collection...")
|
||||
|
||||
eventsCollection := core.NewBaseCollection("events")
|
||||
|
||||
// Admin only access for events
|
||||
eventsCollection.ListRule = nil
|
||||
eventsCollection.ViewRule = nil
|
||||
eventsCollection.CreateRule = nil
|
||||
eventsCollection.UpdateRule = nil
|
||||
eventsCollection.DeleteRule = nil
|
||||
|
||||
// Add fields - ONE EVENT = ONE OPERATION
|
||||
eventsCollection.Fields.Add(&core.NumberField{
|
||||
Name: "seq",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "hash",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "item_id",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "event_id",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "collection",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "operation",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "path",
|
||||
Required: true,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "value",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.TextField{
|
||||
Name: "from",
|
||||
Required: false,
|
||||
})
|
||||
eventsCollection.Fields.Add(&core.DateField{
|
||||
Name: "timestamp",
|
||||
Required: true,
|
||||
})
|
||||
|
||||
// Add index on sequence number
|
||||
eventsCollection.AddIndex("idx_events_seq", false, "seq", "")
|
||||
|
||||
if err := app.Save(eventsCollection); err != nil {
|
||||
return fmt.Errorf("failed to create events collection: %w", err)
|
||||
}
|
||||
fmt.Println("✅ Events collection created successfully")
|
||||
} else {
|
||||
fmt.Println("Events collection already exists")
|
||||
}
|
||||
|
||||
// Create shopping_items collection if it doesn't exist
|
||||
if _, err := app.FindCollectionByNameOrId("shopping_items"); err != nil {
|
||||
fmt.Println("Creating shopping_items collection...")
|
||||
|
||||
itemsCollection := core.NewBaseCollection("shopping_items")
|
||||
|
||||
// Public access rules
|
||||
itemsCollection.ListRule = types.Pointer("")
|
||||
itemsCollection.ViewRule = types.Pointer("")
|
||||
itemsCollection.CreateRule = types.Pointer("")
|
||||
itemsCollection.UpdateRule = types.Pointer("")
|
||||
itemsCollection.DeleteRule = types.Pointer("")
|
||||
|
||||
// Add static fields only - no arbitrary fields allowed
|
||||
itemsCollection.Fields.Add(&core.TextField{
|
||||
Name: "content",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.TextField{
|
||||
Name: "priority",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "created_at",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "updated_at",
|
||||
Required: false,
|
||||
})
|
||||
itemsCollection.Fields.Add(&core.DateField{
|
||||
Name: "deleted_at",
|
||||
Required: false,
|
||||
})
|
||||
|
||||
if err := app.Save(itemsCollection); err != nil {
|
||||
return fmt.Errorf("failed to create shopping_items collection: %w", err)
|
||||
}
|
||||
fmt.Println("✅ Shopping_items collection created successfully")
|
||||
} else {
|
||||
fmt.Println("Shopping_items collection already exists")
|
||||
}
|
||||
|
||||
fmt.Println("✅ PocketBase collections setup complete")
|
||||
return nil
|
||||
}
|
54
test_api.sh
Normal file
54
test_api.sh
Normal file
@@ -0,0 +1,54 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Simple API Tests - ONE OPERATION PER REQUEST
|
||||
echo "🚀 Testing Simple Event Store API..."
|
||||
|
||||
BASE_URL="http://localhost:8090"
|
||||
|
||||
echo "1. Check server health"
|
||||
curl -s "$BASE_URL/api/health"
|
||||
echo
|
||||
|
||||
echo "2. Get current state"
|
||||
curl -s "$BASE_URL/api/state"
|
||||
echo
|
||||
|
||||
echo "3. Create an item - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/content\", \"value\": \"My test item\"}"
|
||||
echo
|
||||
|
||||
echo "4. Update the item - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"replace\", \"path\": \"/content\", \"value\": \"Updated test item\"}"
|
||||
echo
|
||||
|
||||
echo "5. Get all items"
|
||||
curl -s "$BASE_URL/api/items/shopping_items"
|
||||
echo
|
||||
|
||||
echo "6. Add another field - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/priority\", \"value\": \"high\"}"
|
||||
echo
|
||||
|
||||
echo "7. Remove a field - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"remove\", \"path\": \"/priority\"}"
|
||||
echo
|
||||
|
||||
echo "8. Soft delete - single operation"
|
||||
curl -X PATCH "$BASE_URL/api/collections/shopping_items/items/test12345678901" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"op\": \"add\", \"path\": \"/deleted_at\", \"value\": \"2025-09-29T10:00:00Z\"}"
|
||||
echo
|
||||
|
||||
echo "9. Final state"
|
||||
curl -s "$BASE_URL/api/items/shopping_items"
|
||||
echo
|
||||
|
||||
echo "✅ Simple API tests completed!"
|
60
types.go
60
types.go
@@ -1,12 +1,16 @@
|
||||
package main
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/pocketbase"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
// Server generated sequence number of the event - ie when it was applied
|
||||
Seq int `json:"seq"`
|
||||
// Type of the event - create, update, delete, defined by the client
|
||||
Type string `json:"type"`
|
||||
// Hash of the event - server generated, gurantees the event was processed
|
||||
Hash string `json:"hash"`
|
||||
// ItemID of the item that is to be manipulated, defined by the client
|
||||
@@ -15,12 +19,23 @@ type Event struct {
|
||||
EventID string `json:"event_id"`
|
||||
// Collection of the item that is to be manipulated, defined by the client
|
||||
Collection string `json:"collection"`
|
||||
// Data that is to be used for manipulation; for create events that's the full objects and for update events that's the diff
|
||||
Data map[string]interface{} `json:"data"`
|
||||
// Single operation - ONE EVENT = ONE OPERATION
|
||||
Operation string `json:"operation"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
Value string `json:"value"` // Value as string (for add/replace operations)
|
||||
From string `json:"from"` // Source path for move/copy operations
|
||||
// Timestamp of the event - server generated, when the event was processed
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// PatchOperation represents a single RFC6902 JSON Patch operation (still needed for JSONPatcher)
|
||||
type PatchOperation struct {
|
||||
Op string `json:"op"` // add, remove, replace, move, copy, test
|
||||
Path string `json:"path"` // JSON Pointer to target location
|
||||
Value interface{} `json:"value,omitempty"` // Value for add/replace operations
|
||||
From string `json:"from,omitempty"` // Source path for move/copy operations
|
||||
}
|
||||
|
||||
type SLItem struct {
|
||||
ID string
|
||||
Content string
|
||||
@@ -28,3 +43,38 @@ type SLItem struct {
|
||||
UpdatedAt time.Time
|
||||
DeletedAt time.Time
|
||||
}
|
||||
|
||||
type EventStore struct {
|
||||
app *pocketbase.PocketBase
|
||||
}
|
||||
|
||||
type SyncRequest struct {
|
||||
LastSeq int `json:"last_seq"`
|
||||
LastHash string `json:"last_hash"`
|
||||
}
|
||||
|
||||
type SyncResponse struct {
|
||||
Events []Event `json:"events"`
|
||||
CurrentSeq int `json:"current_seq"`
|
||||
CurrentHash string `json:"current_hash"`
|
||||
FullSync bool `json:"full_sync"`
|
||||
}
|
||||
|
||||
// Serialize event manually to ensure consistent field order for hashing
|
||||
func (e *Event) serialize() string {
|
||||
timestamp := e.Timestamp.Format(time.RFC3339Nano)
|
||||
|
||||
return fmt.Sprintf("seq:%d|item_id:%s|event_id:%s|collection:%s|operation:%s|path:%s|value:%s|from:%s|timestamp:%s",
|
||||
e.Seq, e.ItemID, e.EventID, e.Collection, e.Operation, e.Path, e.Value, e.From, timestamp)
|
||||
}
|
||||
|
||||
// Calculate hash of event plus previous hash
|
||||
func (e *Event) calculateHash(prevHash string) string {
|
||||
content := e.serialize() + "|prev_hash:" + prevHash
|
||||
hash := sha256.Sum256([]byte(content))
|
||||
return fmt.Sprintf("%x", hash)
|
||||
}
|
||||
|
||||
func NewEventStore(app *pocketbase.PocketBase) *EventStore {
|
||||
return &EventStore{app: app}
|
||||
}
|
||||
|
Reference in New Issue
Block a user