summaryrefslogtreecommitdiffstats
path: root/internal/storage/events.go
blob: 836f795818c3d42dece35671be36eddf894d92f6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package storage

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/klauspost/compress/zstd"
	"google.golang.org/protobuf/proto"

	pb "northwest.io/muxstr/api/nostr/v1"
)

var (
	// ErrEventNotFound is returned when an event ID is not found in storage.
	ErrEventNotFound = errors.New("event not found")

	// ErrEventExists is returned when attempting to store a duplicate event.
	ErrEventExists = errors.New("event already exists")
)

// EventData holds the data needed to store an event.
type EventData struct {
	Event         *pb.Event // Protobuf event
	CanonicalJSON []byte    // Uncompressed canonical JSON (will be compressed on storage)
}

// StoreEvent stores an event in the database.
// Returns ErrEventExists if the event ID already exists.
func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error {
	// Serialize protobuf
	eventBytes, err := proto.Marshal(data.Event)
	if err != nil {
		return fmt.Errorf("failed to marshal event: %w", err)
	}

	// Compress canonical JSON
	compressedJSON, err := compressJSON(data.CanonicalJSON)
	if err != nil {
		return fmt.Errorf("failed to compress canonical JSON: %w", err)
	}

	// Serialize tags to JSON
	tagsJSON, err := marshalTags(data.Event.Tags)
	if err != nil {
		return fmt.Errorf("failed to marshal tags: %w", err)
	}

	// Insert event
	query := `
		INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig)
		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
	`

	_, err = s.db.ExecContext(ctx, query,
		data.Event.Id,
		eventBytes,
		compressedJSON,
		data.Event.Pubkey,
		data.Event.Kind,
		data.Event.CreatedAt,
		data.Event.Content,
		tagsJSON,
		data.Event.Sig,
	)

	if err != nil {
		// Check for unique constraint violation (duplicate ID)
		if isDuplicateError(err) {
			return ErrEventExists
		}
		return fmt.Errorf("failed to insert event: %w", err)
	}

	return nil
}

// GetEvent retrieves an event by ID.
// Returns ErrEventNotFound if the event doesn't exist.
func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) {
	query := `
		SELECT event_data, canonical_json
		FROM events
		WHERE id = ? AND deleted = 0
	`

	var eventBytes, compressedJSON []byte
	err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)

	if err == sql.ErrNoRows {
		return nil, ErrEventNotFound
	}
	if err != nil {
		return nil, fmt.Errorf("failed to query event: %w", err)
	}

	// Deserialize protobuf
	event := &pb.Event{}
	if err := proto.Unmarshal(eventBytes, event); err != nil {
		return nil, fmt.Errorf("failed to unmarshal event: %w", err)
	}

	return event, nil
}

// GetEventWithCanonical retrieves an event by ID with its canonical JSON.
// The canonical_json field will be populated in the returned event.
func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) {
	query := `
		SELECT event_data, canonical_json
		FROM events
		WHERE id = ? AND deleted = 0
	`

	var eventBytes, compressedJSON []byte
	err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)

	if err == sql.ErrNoRows {
		return nil, ErrEventNotFound
	}
	if err != nil {
		return nil, fmt.Errorf("failed to query event: %w", err)
	}

	// Deserialize protobuf
	event := &pb.Event{}
	if err := proto.Unmarshal(eventBytes, event); err != nil {
		return nil, fmt.Errorf("failed to unmarshal event: %w", err)
	}

	// Decompress canonical JSON
	canonicalJSON, err := decompressJSON(compressedJSON)
	if err != nil {
		return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err)
	}

	event.CanonicalJson = canonicalJSON

	return event, nil
}

// DeleteEvent marks an event as deleted (soft delete).
func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error {
	query := `UPDATE events SET deleted = 1 WHERE id = ?`
	result, err := s.db.ExecContext(ctx, query, eventID)
	if err != nil {
		return fmt.Errorf("failed to delete event: %w", err)
	}

	rows, err := result.RowsAffected()
	if err != nil {
		return fmt.Errorf("failed to get rows affected: %w", err)
	}

	if rows == 0 {
		return ErrEventNotFound
	}

	return nil
}

// compressJSON compresses JSON bytes using zstd.
func compressJSON(data []byte) ([]byte, error) {
	encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
	if err != nil {
		return nil, err
	}
	defer encoder.Close()

	return encoder.EncodeAll(data, nil), nil
}

// decompressJSON decompresses zstd-compressed JSON bytes.
func decompressJSON(data []byte) ([]byte, error) {
	decoder, err := zstd.NewReader(nil)
	if err != nil {
		return nil, err
	}
	defer decoder.Close()

	return decoder.DecodeAll(data, nil)
}

// marshalTags converts protobuf tags to JSON string.
func marshalTags(tags []*pb.Tag) (string, error) {
	if len(tags) == 0 {
		return "[]", nil
	}

	// Convert to [][]string for JSON serialization
	tagArrays := make([][]string, len(tags))
	for i, tag := range tags {
		tagArrays[i] = tag.Values
	}

	data, err := json.Marshal(tagArrays)
	if err != nil {
		return "", err
	}

	return string(data), nil
}

// isDuplicateError checks if the error is a unique constraint violation.
func isDuplicateError(err error) bool {
	if err == nil {
		return false
	}
	// SQLite error messages for UNIQUE constraint violations
	msg := err.Error()
	return msg == "UNIQUE constraint failed: events.id" ||
		msg == "constraint failed: UNIQUE constraint failed: events.id" ||
		msg == "constraint failed: UNIQUE constraint failed: events.id (1555)"
}