summaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/storage/events.go216
-rw-r--r--internal/storage/events_test.go264
-rw-r--r--internal/storage/storage.go150
-rw-r--r--internal/storage/storage_test.go70
4 files changed, 700 insertions, 0 deletions
diff --git a/internal/storage/events.go b/internal/storage/events.go
new file mode 100644
index 0000000..d74fc7e
--- /dev/null
+++ b/internal/storage/events.go
@@ -0,0 +1,216 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9
10 "github.com/klauspost/compress/zstd"
11 "google.golang.org/protobuf/proto"
12
13 pb "northwest.io/nostr-grpc/api/nostr/v1"
14)
15
16var (
17 // ErrEventNotFound is returned when an event ID is not found in storage.
18 ErrEventNotFound = errors.New("event not found")
19
20 // ErrEventExists is returned when attempting to store a duplicate event.
21 ErrEventExists = errors.New("event already exists")
22)
23
24// EventData holds the data needed to store an event.
25type EventData struct {
26 Event *pb.Event // Protobuf event
27 CanonicalJSON []byte // Uncompressed canonical JSON (will be compressed on storage)
28}
29
30// StoreEvent stores an event in the database.
31// Returns ErrEventExists if the event ID already exists.
32func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error {
33 // Serialize protobuf
34 eventBytes, err := proto.Marshal(data.Event)
35 if err != nil {
36 return fmt.Errorf("failed to marshal event: %w", err)
37 }
38
39 // Compress canonical JSON
40 compressedJSON, err := compressJSON(data.CanonicalJSON)
41 if err != nil {
42 return fmt.Errorf("failed to compress canonical JSON: %w", err)
43 }
44
45 // Serialize tags to JSON
46 tagsJSON, err := marshalTags(data.Event.Tags)
47 if err != nil {
48 return fmt.Errorf("failed to marshal tags: %w", err)
49 }
50
51 // Insert event
52 query := `
53 INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig)
54 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
55 `
56
57 _, err = s.db.ExecContext(ctx, query,
58 data.Event.Id,
59 eventBytes,
60 compressedJSON,
61 data.Event.Pubkey,
62 data.Event.Kind,
63 data.Event.CreatedAt,
64 data.Event.Content,
65 tagsJSON,
66 data.Event.Sig,
67 )
68
69 if err != nil {
70 // Check for unique constraint violation (duplicate ID)
71 if isDuplicateError(err) {
72 return ErrEventExists
73 }
74 return fmt.Errorf("failed to insert event: %w", err)
75 }
76
77 return nil
78}
79
80// GetEvent retrieves an event by ID.
81// Returns ErrEventNotFound if the event doesn't exist.
82func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) {
83 query := `
84 SELECT event_data, canonical_json
85 FROM events
86 WHERE id = ? AND deleted = 0
87 `
88
89 var eventBytes, compressedJSON []byte
90 err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)
91
92 if err == sql.ErrNoRows {
93 return nil, ErrEventNotFound
94 }
95 if err != nil {
96 return nil, fmt.Errorf("failed to query event: %w", err)
97 }
98
99 // Deserialize protobuf
100 event := &pb.Event{}
101 if err := proto.Unmarshal(eventBytes, event); err != nil {
102 return nil, fmt.Errorf("failed to unmarshal event: %w", err)
103 }
104
105 return event, nil
106}
107
108// GetEventWithCanonical retrieves an event by ID with its canonical JSON.
109// The canonical_json field will be populated in the returned event.
110func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) {
111 query := `
112 SELECT event_data, canonical_json
113 FROM events
114 WHERE id = ? AND deleted = 0
115 `
116
117 var eventBytes, compressedJSON []byte
118 err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)
119
120 if err == sql.ErrNoRows {
121 return nil, ErrEventNotFound
122 }
123 if err != nil {
124 return nil, fmt.Errorf("failed to query event: %w", err)
125 }
126
127 // Deserialize protobuf
128 event := &pb.Event{}
129 if err := proto.Unmarshal(eventBytes, event); err != nil {
130 return nil, fmt.Errorf("failed to unmarshal event: %w", err)
131 }
132
133 // Decompress canonical JSON
134 canonicalJSON, err := decompressJSON(compressedJSON)
135 if err != nil {
136 return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err)
137 }
138
139 event.CanonicalJson = canonicalJSON
140
141 return event, nil
142}
143
144// DeleteEvent marks an event as deleted (soft delete).
145func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error {
146 query := `UPDATE events SET deleted = 1 WHERE id = ?`
147 result, err := s.db.ExecContext(ctx, query, eventID)
148 if err != nil {
149 return fmt.Errorf("failed to delete event: %w", err)
150 }
151
152 rows, err := result.RowsAffected()
153 if err != nil {
154 return fmt.Errorf("failed to get rows affected: %w", err)
155 }
156
157 if rows == 0 {
158 return ErrEventNotFound
159 }
160
161 return nil
162}
163
164// compressJSON compresses JSON bytes using zstd.
165func compressJSON(data []byte) ([]byte, error) {
166 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
167 if err != nil {
168 return nil, err
169 }
170 defer encoder.Close()
171
172 return encoder.EncodeAll(data, nil), nil
173}
174
175// decompressJSON decompresses zstd-compressed JSON bytes.
176func decompressJSON(data []byte) ([]byte, error) {
177 decoder, err := zstd.NewReader(nil)
178 if err != nil {
179 return nil, err
180 }
181 defer decoder.Close()
182
183 return decoder.DecodeAll(data, nil)
184}
185
186// marshalTags converts protobuf tags to JSON string.
187func marshalTags(tags []*pb.Tag) (string, error) {
188 if len(tags) == 0 {
189 return "[]", nil
190 }
191
192 // Convert to [][]string for JSON serialization
193 tagArrays := make([][]string, len(tags))
194 for i, tag := range tags {
195 tagArrays[i] = tag.Values
196 }
197
198 data, err := json.Marshal(tagArrays)
199 if err != nil {
200 return "", err
201 }
202
203 return string(data), nil
204}
205
206// isDuplicateError checks if the error is a unique constraint violation.
207func isDuplicateError(err error) bool {
208 if err == nil {
209 return false
210 }
211 // SQLite error messages for UNIQUE constraint violations
212 msg := err.Error()
213 return msg == "UNIQUE constraint failed: events.id" ||
214 msg == "constraint failed: UNIQUE constraint failed: events.id" ||
215 msg == "constraint failed: UNIQUE constraint failed: events.id (1555)"
216}
diff --git a/internal/storage/events_test.go b/internal/storage/events_test.go
new file mode 100644
index 0000000..4393404
--- /dev/null
+++ b/internal/storage/events_test.go
@@ -0,0 +1,264 @@
1package storage
2
3import (
4 "context"
5 "testing"
6
7 pb "northwest.io/nostr-grpc/api/nostr/v1"
8)
9
10func TestStoreEvent(t *testing.T) {
11 store, err := New(":memory:")
12 if err != nil {
13 t.Fatalf("failed to create storage: %v", err)
14 }
15 defer store.Close()
16
17 ctx := context.Background()
18
19 // Create test event
20 event := &pb.Event{
21 Id: "test123",
22 Pubkey: "pubkey123",
23 CreatedAt: 1234567890,
24 Kind: 1,
25 Tags: []*pb.Tag{{Values: []string{"e", "event1"}}},
26 Content: "Hello, Nostr!",
27 Sig: "sig123",
28 }
29
30 canonicalJSON := []byte(`[0,"pubkey123",1234567890,1,[["e","event1"]],"Hello, Nostr!"]`)
31
32 data := &EventData{
33 Event: event,
34 CanonicalJSON: canonicalJSON,
35 }
36
37 // Store event
38 err = store.StoreEvent(ctx, data)
39 if err != nil {
40 t.Fatalf("failed to store event: %v", err)
41 }
42
43 // Verify event was stored
44 retrieved, err := store.GetEvent(ctx, "test123")
45 if err != nil {
46 t.Fatalf("failed to retrieve event: %v", err)
47 }
48
49 if retrieved.Id != event.Id {
50 t.Errorf("expected ID %s, got %s", event.Id, retrieved.Id)
51 }
52 if retrieved.Content != event.Content {
53 t.Errorf("expected content %s, got %s", event.Content, retrieved.Content)
54 }
55}
56
57func TestStoreEventDuplicate(t *testing.T) {
58 store, err := New(":memory:")
59 if err != nil {
60 t.Fatalf("failed to create storage: %v", err)
61 }
62 defer store.Close()
63
64 ctx := context.Background()
65
66 event := &pb.Event{
67 Id: "duplicate123",
68 Pubkey: "pubkey123",
69 CreatedAt: 1234567890,
70 Kind: 1,
71 Content: "test",
72 Sig: "sig123",
73 }
74
75 data := &EventData{
76 Event: event,
77 CanonicalJSON: []byte(`[0,"pubkey123",1234567890,1,[],"test"]`),
78 }
79
80 // Store first time
81 err = store.StoreEvent(ctx, data)
82 if err != nil {
83 t.Fatalf("failed to store event first time: %v", err)
84 }
85
86 // Try to store again
87 err = store.StoreEvent(ctx, data)
88 if err != ErrEventExists {
89 t.Errorf("expected ErrEventExists, got %v", err)
90 }
91}
92
93func TestGetEvent(t *testing.T) {
94 store, err := New(":memory:")
95 if err != nil {
96 t.Fatalf("failed to create storage: %v", err)
97 }
98 defer store.Close()
99
100 ctx := context.Background()
101
102 // Test non-existent event
103 _, err = store.GetEvent(ctx, "nonexistent")
104 if err != ErrEventNotFound {
105 t.Errorf("expected ErrEventNotFound, got %v", err)
106 }
107}
108
109func TestGetEventWithCanonical(t *testing.T) {
110 store, err := New(":memory:")
111 if err != nil {
112 t.Fatalf("failed to create storage: %v", err)
113 }
114 defer store.Close()
115
116 ctx := context.Background()
117
118 canonicalJSON := []byte(`[0,"pubkey123",1234567890,1,[],"test"]`)
119
120 event := &pb.Event{
121 Id: "canonical123",
122 Pubkey: "pubkey123",
123 CreatedAt: 1234567890,
124 Kind: 1,
125 Content: "test",
126 Sig: "sig123",
127 }
128
129 data := &EventData{
130 Event: event,
131 CanonicalJSON: canonicalJSON,
132 }
133
134 err = store.StoreEvent(ctx, data)
135 if err != nil {
136 t.Fatalf("failed to store event: %v", err)
137 }
138
139 // Retrieve with canonical JSON
140 retrieved, err := store.GetEventWithCanonical(ctx, "canonical123")
141 if err != nil {
142 t.Fatalf("failed to retrieve event: %v", err)
143 }
144
145 if string(retrieved.CanonicalJson) != string(canonicalJSON) {
146 t.Errorf("canonical JSON mismatch:\nexpected: %s\ngot: %s",
147 canonicalJSON, retrieved.CanonicalJson)
148 }
149}
150
151func TestDeleteEvent(t *testing.T) {
152 store, err := New(":memory:")
153 if err != nil {
154 t.Fatalf("failed to create storage: %v", err)
155 }
156 defer store.Close()
157
158 ctx := context.Background()
159
160 event := &pb.Event{
161 Id: "delete123",
162 Pubkey: "pubkey123",
163 CreatedAt: 1234567890,
164 Kind: 1,
165 Content: "to be deleted",
166 Sig: "sig123",
167 }
168
169 data := &EventData{
170 Event: event,
171 CanonicalJSON: []byte(`[0,"pubkey123",1234567890,1,[],"to be deleted"]`),
172 }
173
174 // Store event
175 err = store.StoreEvent(ctx, data)
176 if err != nil {
177 t.Fatalf("failed to store event: %v", err)
178 }
179
180 // Delete event
181 err = store.DeleteEvent(ctx, "delete123")
182 if err != nil {
183 t.Fatalf("failed to delete event: %v", err)
184 }
185
186 // Verify event is no longer retrievable
187 _, err = store.GetEvent(ctx, "delete123")
188 if err != ErrEventNotFound {
189 t.Errorf("expected ErrEventNotFound after deletion, got %v", err)
190 }
191
192 // Try deleting non-existent event
193 err = store.DeleteEvent(ctx, "nonexistent")
194 if err != ErrEventNotFound {
195 t.Errorf("expected ErrEventNotFound, got %v", err)
196 }
197}
198
199func TestCompressDecompressJSON(t *testing.T) {
200 original := []byte(`{"key":"value","array":[1,2,3],"nested":{"a":"b"}}`)
201
202 compressed, err := compressJSON(original)
203 if err != nil {
204 t.Fatalf("compression failed: %v", err)
205 }
206
207 // Verify compression reduces size (for larger data)
208 if len(compressed) >= len(original) {
209 t.Logf("Note: compressed size (%d) >= original (%d) - normal for small data",
210 len(compressed), len(original))
211 }
212
213 decompressed, err := decompressJSON(compressed)
214 if err != nil {
215 t.Fatalf("decompression failed: %v", err)
216 }
217
218 if string(decompressed) != string(original) {
219 t.Errorf("decompressed data doesn't match original:\nexpected: %s\ngot: %s",
220 original, decompressed)
221 }
222}
223
224func TestMarshalTags(t *testing.T) {
225 tests := []struct {
226 name string
227 tags []*pb.Tag
228 expected string
229 }{
230 {
231 name: "empty tags",
232 tags: nil,
233 expected: "[]",
234 },
235 {
236 name: "single tag",
237 tags: []*pb.Tag{
238 {Values: []string{"e", "event123"}},
239 },
240 expected: `[["e","event123"]]`,
241 },
242 {
243 name: "multiple tags",
244 tags: []*pb.Tag{
245 {Values: []string{"e", "event123", "wss://relay.example.com"}},
246 {Values: []string{"p", "pubkey456"}},
247 },
248 expected: `[["e","event123","wss://relay.example.com"],["p","pubkey456"]]`,
249 },
250 }
251
252 for _, tt := range tests {
253 t.Run(tt.name, func(t *testing.T) {
254 result, err := marshalTags(tt.tags)
255 if err != nil {
256 t.Fatalf("marshalTags failed: %v", err)
257 }
258
259 if result != tt.expected {
260 t.Errorf("expected %s, got %s", tt.expected, result)
261 }
262 })
263 }
264}
diff --git a/internal/storage/storage.go b/internal/storage/storage.go
new file mode 100644
index 0000000..64fc4c6
--- /dev/null
+++ b/internal/storage/storage.go
@@ -0,0 +1,150 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7
8 _ "modernc.org/sqlite" // Pure Go SQLite driver
9)
10
11// Storage provides event persistence using SQLite.
12// Consumers should define their own interface based on their needs.
13type Storage struct {
14 db *sql.DB
15}
16
17// New creates a new Storage instance and initializes the database schema.
18// The dbPath should be a file path or ":memory:" for in-memory database.
19func New(dbPath string) (*Storage, error) {
20 db, err := sql.Open("sqlite", dbPath)
21 if err != nil {
22 return nil, fmt.Errorf("failed to open database: %w", err)
23 }
24
25 // Configure SQLite for optimal performance
26 pragmas := []string{
27 "PRAGMA journal_mode=WAL", // Write-Ahead Logging for concurrency
28 "PRAGMA synchronous=NORMAL", // Balance safety and performance
29 "PRAGMA cache_size=10000", // 10000 pages (~40MB cache)
30 "PRAGMA temp_store=MEMORY", // Temp tables in memory
31 "PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
32 "PRAGMA page_size=4096", // Standard page size
33 "PRAGMA foreign_keys=ON", // Enforce foreign key constraints
34 "PRAGMA busy_timeout=5000", // Wait up to 5s for locks
35 "PRAGMA auto_vacuum=INCREMENTAL", // Reclaim space incrementally
36 }
37
38 for _, pragma := range pragmas {
39 if _, err := db.Exec(pragma); err != nil {
40 db.Close()
41 return nil, fmt.Errorf("failed to set pragma %q: %w", pragma, err)
42 }
43 }
44
45 s := &Storage{db: db}
46
47 // Initialize schema
48 if err := s.initSchema(context.Background()); err != nil {
49 db.Close()
50 return nil, fmt.Errorf("failed to initialize schema: %w", err)
51 }
52
53 return s, nil
54}
55
56// Close closes the database connection.
57func (s *Storage) Close() error {
58 return s.db.Close()
59}
60
61// initSchema creates all necessary tables and indexes.
62func (s *Storage) initSchema(ctx context.Context) error {
63 schema := `
64 -- Main events table
65 CREATE TABLE IF NOT EXISTS events (
66 -- Primary event data
67 id TEXT PRIMARY KEY,
68 event_data BLOB NOT NULL, -- Protobuf binary
69 canonical_json BLOB NOT NULL, -- zstd compressed canonical JSON
70
71 -- Denormalized fields for efficient querying
72 pubkey TEXT NOT NULL,
73 kind INTEGER NOT NULL,
74 created_at INTEGER NOT NULL, -- Unix timestamp
75 content TEXT, -- For full-text search (optional)
76 tags TEXT, -- JSON text for tag queries (use json_* functions)
77 sig TEXT NOT NULL,
78
79 -- Metadata
80 deleted INTEGER DEFAULT 0, -- STRICT mode: use INTEGER for boolean
81 received_at INTEGER DEFAULT (unixepoch())
82 ) STRICT;
83
84 -- Critical indexes for Nostr query patterns
85 CREATE INDEX IF NOT EXISTS idx_pubkey_created
86 ON events(pubkey, created_at DESC)
87 WHERE deleted = 0;
88
89 CREATE INDEX IF NOT EXISTS idx_kind_created
90 ON events(kind, created_at DESC)
91 WHERE deleted = 0;
92
93 CREATE INDEX IF NOT EXISTS idx_created
94 ON events(created_at DESC)
95 WHERE deleted = 0;
96
97 -- For tag queries (#e, #p, etc)
98 CREATE INDEX IF NOT EXISTS idx_tags
99 ON events(tags)
100 WHERE deleted = 0;
101
102 -- Deletion events (NIP-09)
103 CREATE TABLE IF NOT EXISTS deletions (
104 event_id TEXT PRIMARY KEY, -- ID of deletion event
105 deleted_event_id TEXT NOT NULL, -- ID of event being deleted
106 pubkey TEXT NOT NULL, -- Who requested deletion
107 created_at INTEGER NOT NULL,
108 FOREIGN KEY (deleted_event_id) REFERENCES events(id)
109 ) STRICT;
110
111 CREATE INDEX IF NOT EXISTS idx_deleted_event
112 ON deletions(deleted_event_id);
113
114 -- Replaceable events tracking (NIP-16, NIP-33)
115 CREATE TABLE IF NOT EXISTS replaceable_events (
116 kind INTEGER NOT NULL,
117 pubkey TEXT NOT NULL,
118 d_tag TEXT NOT NULL DEFAULT '', -- For parameterized replaceable events (empty string for non-parameterized)
119 current_event_id TEXT NOT NULL,
120 created_at INTEGER NOT NULL,
121 PRIMARY KEY (kind, pubkey, d_tag),
122 FOREIGN KEY (current_event_id) REFERENCES events(id)
123 ) STRICT;
124
125 -- Auth challenges (NIP-42)
126 CREATE TABLE IF NOT EXISTS auth_challenges (
127 challenge TEXT PRIMARY KEY,
128 created_at INTEGER NOT NULL,
129 expires_at INTEGER NOT NULL,
130 used INTEGER DEFAULT 0 -- STRICT mode: use INTEGER for boolean
131 ) STRICT;
132
133 -- Rate limiting
134 CREATE TABLE IF NOT EXISTS rate_limits (
135 pubkey TEXT PRIMARY KEY,
136 event_count INTEGER DEFAULT 0,
137 window_start INTEGER NOT NULL,
138 last_reset INTEGER DEFAULT (unixepoch())
139 ) STRICT;
140 `
141
142 _, err := s.db.ExecContext(ctx, schema)
143 return err
144}
145
146// DB returns the underlying *sql.DB for advanced usage.
147// This allows consumers to execute custom queries if needed.
148func (s *Storage) DB() *sql.DB {
149 return s.db
150}
diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go
new file mode 100644
index 0000000..f2fe401
--- /dev/null
+++ b/internal/storage/storage_test.go
@@ -0,0 +1,70 @@
1package storage
2
3import (
4 "testing"
5)
6
7func TestNew(t *testing.T) {
8 // Test in-memory database
9 store, err := New(":memory:")
10 if err != nil {
11 t.Fatalf("failed to create storage: %v", err)
12 }
13 defer store.Close()
14
15 // Verify database is accessible
16 if store.DB() == nil {
17 t.Fatal("DB() returned nil")
18 }
19
20 // Verify schema was created by checking if tables exist
21 var count int
22 query := `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('events', 'deletions', 'replaceable_events')`
23 err = store.DB().QueryRow(query).Scan(&count)
24 if err != nil {
25 t.Fatalf("failed to query tables: %v", err)
26 }
27
28 if count != 3 {
29 t.Errorf("expected 3 main tables, got %d", count)
30 }
31}
32
33func TestNewFileDatabase(t *testing.T) {
34 // Test file-based database
35 dbPath := t.TempDir() + "/test.db"
36 store, err := New(dbPath)
37 if err != nil {
38 t.Fatalf("failed to create file-based storage: %v", err)
39 }
40 defer store.Close()
41
42 // Verify WAL mode is enabled
43 var walMode string
44 err = store.DB().QueryRow("PRAGMA journal_mode").Scan(&walMode)
45 if err != nil {
46 t.Fatalf("failed to query journal mode: %v", err)
47 }
48
49 if walMode != "wal" {
50 t.Errorf("expected WAL mode, got %s", walMode)
51 }
52}
53
54func TestClose(t *testing.T) {
55 store, err := New(":memory:")
56 if err != nil {
57 t.Fatalf("failed to create storage: %v", err)
58 }
59
60 err = store.Close()
61 if err != nil {
62 t.Errorf("failed to close storage: %v", err)
63 }
64
65 // Verify database is closed by attempting a query
66 err = store.DB().Ping()
67 if err == nil {
68 t.Error("expected error when pinging closed database")
69 }
70}