summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/relay/main.go103
-rw-r--r--internal/auth/README.md4
-rw-r--r--internal/auth/auth_test.go4
-rw-r--r--internal/auth/interceptor.go15
-rw-r--r--internal/config/config.go75
-rw-r--r--internal/metrics/metrics.go9
6 files changed, 155 insertions, 55 deletions
diff --git a/cmd/relay/main.go b/cmd/relay/main.go
index 3a1eeef..209d758 100644
--- a/cmd/relay/main.go
+++ b/cmd/relay/main.go
@@ -1,6 +1,7 @@
1package main 1package main
2 2
3import ( 3import (
4 "context"
4 "flag" 5 "flag"
5 "log" 6 "log"
6 "net" 7 "net"
@@ -9,8 +10,6 @@ import (
9 "os/signal" 10 "os/signal"
10 "syscall" 11 "syscall"
11 12
12 "context"
13
14 "connectrpc.com/connect" 13 "connectrpc.com/connect"
15 "golang.org/x/net/http2" 14 "golang.org/x/net/http2"
16 "golang.org/x/net/http2/h2c" 15 "golang.org/x/net/http2/h2c"
@@ -18,23 +17,27 @@ import (
18 17
19 pb "northwest.io/muxstr/api/nostr/v1" 18 pb "northwest.io/muxstr/api/nostr/v1"
20 "northwest.io/muxstr/api/nostr/v1/nostrv1connect" 19 "northwest.io/muxstr/api/nostr/v1/nostrv1connect"
20 "northwest.io/muxstr/internal/auth"
21 "northwest.io/muxstr/internal/config"
21 connecthandler "northwest.io/muxstr/internal/handler/connect" 22 connecthandler "northwest.io/muxstr/internal/handler/connect"
22 grpchandler "northwest.io/muxstr/internal/handler/grpc" 23 grpchandler "northwest.io/muxstr/internal/handler/grpc"
23 wshandler "northwest.io/muxstr/internal/handler/websocket" 24 wshandler "northwest.io/muxstr/internal/handler/websocket"
25 "northwest.io/muxstr/internal/metrics"
26 "northwest.io/muxstr/internal/ratelimit"
24 "northwest.io/muxstr/internal/storage" 27 "northwest.io/muxstr/internal/storage"
25 "northwest.io/muxstr/internal/subscription" 28 "northwest.io/muxstr/internal/subscription"
26) 29)
27 30
28func main() { 31func main() {
29 var ( 32 configFile := flag.String("config", "", "Path to config file (optional)")
30 grpcAddr = flag.String("grpc-addr", ":50051", "gRPC server address")
31 wsAddr = flag.String("ws-addr", ":8080", "WebSocket server address")
32 dbPath = flag.String("db", "relay.db", "SQLite database path")
33 publicURL = flag.String("public-url", "", "Public URL for relay (e.g., nostr-grpc.x.bdw.to)")
34 )
35 flag.Parse() 33 flag.Parse()
36 34
37 store, err := storage.New(*dbPath) 35 cfg, err := config.Load(*configFile)
36 if err != nil {
37 log.Fatalf("failed to load config: %v", err)
38 }
39
40 store, err := storage.New(cfg.Database.Path)
38 if err != nil { 41 if err != nil {
39 log.Fatalf("failed to create storage: %v", err) 42 log.Fatalf("failed to create storage: %v", err)
40 } 43 }
@@ -51,42 +54,86 @@ func main() {
51 path, handler := nostrv1connect.NewNostrRelayHandler(connectHandler, connect.WithInterceptors()) 54 path, handler := nostrv1connect.NewNostrRelayHandler(connectHandler, connect.WithInterceptors())
52 mux.Handle(path, handler) 55 mux.Handle(path, handler)
53 56
57 var serverOpts []grpc.ServerOption
58
59 if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled {
60 authOpts := &auth.InterceptorOptions{
61 Read: cfg.Auth.Read,
62 Write: cfg.Auth.Write,
63 TimestampWindow: cfg.Auth.TimestampWindow,
64 SkipMethods: cfg.Auth.SkipMethods,
65 }
66 serverOpts = append(serverOpts,
67 grpc.UnaryInterceptor(auth.NostrUnaryInterceptor(authOpts)),
68 grpc.StreamInterceptor(auth.NostrStreamInterceptor(authOpts)),
69 )
70 }
71
72 if cfg.RateLimit.Enabled {
73 limiter := ratelimit.New(cfg.RateLimit.ToRateLimiter())
74 serverOpts = append(serverOpts,
75 grpc.ChainUnaryInterceptor(ratelimit.UnaryInterceptor(limiter)),
76 grpc.ChainStreamInterceptor(ratelimit.StreamInterceptor(limiter)),
77 )
78 }
79
80 var m *metrics.Metrics
81 if cfg.Metrics.Enabled {
82 m = metrics.New(cfg.Metrics.ToMetrics())
83 serverOpts = append(serverOpts,
84 grpc.ChainUnaryInterceptor(metrics.UnaryServerInterceptor(m)),
85 grpc.ChainStreamInterceptor(metrics.StreamServerInterceptor(m)),
86 )
87
88 go func() {
89 log.Printf("Metrics server listening on %s%s", cfg.Metrics.Addr, cfg.Metrics.Path)
90 if err := m.Serve(cfg.Metrics.Addr, cfg.Metrics.Path); err != nil {
91 log.Printf("Metrics server failed: %v", err)
92 }
93 }()
94 }
95
54 wsHandler := wshandler.NewHandler(store, subManager) 96 wsHandler := wshandler.NewHandler(store, subManager)
55 97
56 // Set public URLs for index page
57 var grpcDisplay, httpDisplay, wsDisplay string 98 var grpcDisplay, httpDisplay, wsDisplay string
58 if *publicURL != "" { 99 if cfg.Server.PublicURL != "" {
59 // Use public URLs when behind reverse proxy (port 443) 100 grpcDisplay = cfg.Server.PublicURL + ":443"
60 grpcDisplay = *publicURL + ":443" 101 httpDisplay = "https://" + cfg.Server.PublicURL
61 httpDisplay = "https://" + *publicURL 102 wsDisplay = "wss://" + cfg.Server.PublicURL
62 wsDisplay = "wss://" + *publicURL
63 } else { 103 } else {
64 // Use local addresses for development 104 grpcDisplay = cfg.Server.GrpcAddr
65 grpcDisplay = *grpcAddr 105 httpDisplay = "http://" + cfg.Server.HttpAddr
66 httpDisplay = "http://" + *wsAddr 106 wsDisplay = "ws://" + cfg.Server.HttpAddr
67 wsDisplay = "ws://" + *wsAddr
68 } 107 }
69 wsHandler.SetIndexData(grpcDisplay, httpDisplay, wsDisplay) 108 wsHandler.SetIndexData(grpcDisplay, httpDisplay, wsDisplay)
70 mux.Handle("/", wsHandler) 109 mux.Handle("/", wsHandler)
71 110
72 grpcLis, err := net.Listen("tcp", *grpcAddr) 111 grpcLis, err := net.Listen("tcp", cfg.Server.GrpcAddr)
73 if err != nil { 112 if err != nil {
74 log.Fatalf("failed to listen on gRPC port: %v", err) 113 log.Fatalf("failed to listen on gRPC port: %v", err)
75 } 114 }
76 115
77 grpcServer := grpc.NewServer() 116 grpcServer := grpc.NewServer(serverOpts...)
78 pb.RegisterNostrRelayServer(grpcServer, grpcHandler) 117 pb.RegisterNostrRelayServer(grpcServer, grpcHandler)
79 118
80 httpServer := &http.Server{ 119 httpServer := &http.Server{
81 Addr: *wsAddr, 120 Addr: cfg.Server.HttpAddr,
82 Handler: h2c.NewHandler(mux, &http2.Server{}), 121 Handler: h2c.NewHandler(mux, &http2.Server{}),
122 ReadTimeout: cfg.Server.ReadTimeout,
123 WriteTimeout: cfg.Server.WriteTimeout,
83 } 124 }
84 125
85 log.Printf("gRPC server listening on %s", *grpcAddr) 126 log.Printf("gRPC server listening on %s", cfg.Server.GrpcAddr)
86 log.Printf("HTTP server listening on %s", *wsAddr) 127 log.Printf("HTTP server listening on %s", cfg.Server.HttpAddr)
87 log.Printf(" - Connect (gRPC-Web) at %s/nostr.v1.NostrRelay/*", *wsAddr) 128 log.Printf(" - Connect (gRPC-Web) at %s/nostr.v1.NostrRelay/*", cfg.Server.HttpAddr)
88 log.Printf(" - WebSocket (Nostr) at %s/", *wsAddr) 129 log.Printf(" - WebSocket (Nostr) at %s/", cfg.Server.HttpAddr)
89 log.Printf("Database: %s", *dbPath) 130 log.Printf("Database: %s", cfg.Database.Path)
131 if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled {
132 log.Printf("Auth: enabled (read=%v write=%v)", cfg.Auth.Read.Enabled, cfg.Auth.Write.Enabled)
133 }
134 if cfg.RateLimit.Enabled {
135 log.Printf("Rate limiting: enabled")
136 }
90 137
91 sigChan := make(chan os.Signal, 1) 138 sigChan := make(chan os.Signal, 1)
92 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 139 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
diff --git a/internal/auth/README.md b/internal/auth/README.md
index de37010..98d1437 100644
--- a/internal/auth/README.md
+++ b/internal/auth/README.md
@@ -142,11 +142,11 @@ import (
142 142
143// Create auth options 143// Create auth options
144authOpts := &auth.InterceptorOptions{ 144authOpts := &auth.InterceptorOptions{
145 Read: auth.OperationAuthConfig{ 145 Read: auth.AuthOperationConfig{
146 Enabled: true, // Require auth for reads 146 Enabled: true, // Require auth for reads
147 AllowedNpubs: nil, // Accept any valid signature 147 AllowedNpubs: nil, // Accept any valid signature
148 }, 148 },
149 Write: auth.OperationAuthConfig{ 149 Write: auth.AuthOperationConfig{
150 Enabled: true, 150 Enabled: true,
151 AllowedNpubs: []string{"hex-pubkey-1", "hex-pubkey-2"}, // Whitelist 151 AllowedNpubs: []string{"hex-pubkey-1", "hex-pubkey-2"}, // Whitelist
152 }, 152 },
diff --git a/internal/auth/auth_test.go b/internal/auth/auth_test.go
index 7b0fa13..68c68f5 100644
--- a/internal/auth/auth_test.go
+++ b/internal/auth/auth_test.go
@@ -243,11 +243,11 @@ func TestValidateAuthFromContext(t *testing.T) {
243 ctx := metadata.NewIncomingContext(context.Background(), md) 243 ctx := metadata.NewIncomingContext(context.Background(), md)
244 244
245 opts := &InterceptorOptions{ 245 opts := &InterceptorOptions{
246 Read: OperationAuthConfig{ 246 Read: AuthOperationConfig{
247 Enabled: true, 247 Enabled: true,
248 AllowedNpubs: nil, 248 AllowedNpubs: nil,
249 }, 249 },
250 Write: OperationAuthConfig{ 250 Write: AuthOperationConfig{
251 Enabled: true, 251 Enabled: true,
252 AllowedNpubs: nil, 252 AllowedNpubs: nil,
253 }, 253 },
diff --git a/internal/auth/interceptor.go b/internal/auth/interceptor.go
index 42c2688..67450ce 100644
--- a/internal/auth/interceptor.go
+++ b/internal/auth/interceptor.go
@@ -18,28 +18,25 @@ const (
18) 18)
19 19
20type InterceptorOptions struct { 20type InterceptorOptions struct {
21 Read OperationAuthConfig 21 Read AuthOperationConfig
22 Write OperationAuthConfig 22 Write AuthOperationConfig
23 TimestampWindow int64 23 TimestampWindow int64
24 ValidatePayload bool 24 ValidatePayload bool
25 SkipMethods []string 25 SkipMethods []string
26} 26}
27 27
28// OperationAuthConfig configures auth for read or write operations. 28type AuthOperationConfig struct {
29// Three states: disabled (allow all), enabled with empty list (require auth),
30// enabled with npubs (whitelist only). Npubs normalized to hex at load time.
31type OperationAuthConfig struct {
32 Enabled bool 29 Enabled bool
33 AllowedNpubs []string 30 AllowedNpubs []string
34} 31}
35 32
36func DefaultInterceptorOptions() *InterceptorOptions { 33func DefaultInterceptorOptions() *InterceptorOptions {
37 return &InterceptorOptions{ 34 return &InterceptorOptions{
38 Read: OperationAuthConfig{ 35 Read: AuthOperationConfig{
39 Enabled: false, 36 Enabled: false,
40 AllowedNpubs: nil, 37 AllowedNpubs: nil,
41 }, 38 },
42 Write: OperationAuthConfig{ 39 Write: AuthOperationConfig{
43 Enabled: false, 40 Enabled: false,
44 AllowedNpubs: nil, 41 AllowedNpubs: nil,
45 }, 42 },
@@ -154,7 +151,7 @@ func validateAuthFromContext(ctx context.Context, method string, opts *Intercept
154 151
155 pubkey := ExtractPubkey(event) 152 pubkey := ExtractPubkey(event)
156 153
157 var opConfig OperationAuthConfig 154 var opConfig AuthOperationConfig
158 if isWriteMethod(method) { 155 if isWriteMethod(method) {
159 opConfig = opts.Write 156 opConfig = opts.Write
160 } else { 157 } else {
diff --git a/internal/config/config.go b/internal/config/config.go
index 36c8be5..dcceade 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -7,6 +7,9 @@ import (
7 "time" 7 "time"
8 8
9 "gopkg.in/yaml.v3" 9 "gopkg.in/yaml.v3"
10 "northwest.io/muxstr/internal/auth"
11 "northwest.io/muxstr/internal/metrics"
12 "northwest.io/muxstr/internal/ratelimit"
10 "northwest.io/nostr" 13 "northwest.io/nostr"
11) 14)
12 15
@@ -33,18 +36,10 @@ type DatabaseConfig struct {
33} 36}
34 37
35type AuthConfig struct { 38type AuthConfig struct {
36 Read AuthOperationConfig `yaml:"read"` 39 Read auth.AuthOperationConfig `yaml:"read"`
37 Write AuthOperationConfig `yaml:"write"` 40 Write auth.AuthOperationConfig `yaml:"write"`
38 TimestampWindow int64 `yaml:"timestamp_window"` 41 TimestampWindow int64 `yaml:"timestamp_window"`
39 SkipMethods []string `yaml:"skip_methods"` 42 SkipMethods []string `yaml:"skip_methods"`
40}
41
42// AuthOperationConfig configures auth for read or write operations.
43// Three states: disabled (allow all), enabled with empty list (require auth),
44// enabled with npubs (whitelist only). Npubs normalized to hex at load time.
45type AuthOperationConfig struct {
46 Enabled bool `yaml:"enabled"`
47 AllowedNpubs []string `yaml:"allowed_npubs"`
48} 43}
49 44
50type RateLimitConfig struct { 45type RateLimitConfig struct {
@@ -105,11 +100,11 @@ func Default() *Config {
105 Path: "relay.db", 100 Path: "relay.db",
106 }, 101 },
107 Auth: AuthConfig{ 102 Auth: AuthConfig{
108 Read: AuthOperationConfig{ 103 Read: auth.AuthOperationConfig{
109 Enabled: false, 104 Enabled: false,
110 AllowedNpubs: nil, 105 AllowedNpubs: nil,
111 }, 106 },
112 Write: AuthOperationConfig{ 107 Write: auth.AuthOperationConfig{
113 Enabled: false, 108 Enabled: false,
114 AllowedNpubs: nil, 109 AllowedNpubs: nil,
115 }, 110 },
@@ -349,3 +344,55 @@ func (c *Config) Save(filename string) error {
349 344
350 return nil 345 return nil
351} 346}
347
348func (r *RateLimitConfig) ToRateLimiter() *ratelimit.Config {
349 rlConfig := &ratelimit.Config{
350 RequestsPerSecond: r.DefaultRPS,
351 BurstSize: r.DefaultBurst,
352 IPRequestsPerSecond: r.IPRPS,
353 IPBurstSize: r.IPBurst,
354 SkipMethods: r.SkipMethods,
355 SkipUsers: r.SkipUsers,
356 CleanupInterval: r.CleanupInterval,
357 MaxIdleTime: r.MaxIdleTime,
358 }
359
360 if r.Methods != nil {
361 rlConfig.MethodLimits = make(map[string]ratelimit.MethodLimit, len(r.Methods))
362 for method, limit := range r.Methods {
363 rlConfig.MethodLimits[method] = ratelimit.MethodLimit{
364 RequestsPerSecond: limit.RPS,
365 BurstSize: limit.Burst,
366 }
367 }
368 }
369
370 if r.Users != nil {
371 rlConfig.UserLimits = make(map[string]ratelimit.UserLimit, len(r.Users))
372 for user, limit := range r.Users {
373 userLimit := ratelimit.UserLimit{
374 RequestsPerSecond: limit.RPS,
375 BurstSize: limit.Burst,
376 }
377 if limit.Methods != nil {
378 userLimit.MethodLimits = make(map[string]ratelimit.MethodLimit, len(limit.Methods))
379 for method, methodLimit := range limit.Methods {
380 userLimit.MethodLimits[method] = ratelimit.MethodLimit{
381 RequestsPerSecond: methodLimit.RPS,
382 BurstSize: methodLimit.Burst,
383 }
384 }
385 }
386 rlConfig.UserLimits[user] = userLimit
387 }
388 }
389
390 return rlConfig
391}
392
393func (m *MetricsConfig) ToMetrics() *metrics.Config {
394 return &metrics.Config{
395 Namespace: m.Namespace,
396 Subsystem: m.Subsystem,
397 }
398}
diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go
index 3cb675f..9030d67 100644
--- a/internal/metrics/metrics.go
+++ b/internal/metrics/metrics.go
@@ -1,8 +1,11 @@
1package metrics 1package metrics
2 2
3import ( 3import (
4 "net/http"
5
4 "github.com/prometheus/client_golang/prometheus" 6 "github.com/prometheus/client_golang/prometheus"
5 "github.com/prometheus/client_golang/prometheus/promauto" 7 "github.com/prometheus/client_golang/prometheus/promauto"
8 "github.com/prometheus/client_golang/prometheus/promhttp"
6) 9)
7 10
8// Metrics holds all Prometheus metrics for the relay. 11// Metrics holds all Prometheus metrics for the relay.
@@ -280,3 +283,9 @@ const (
280 StatusRateLimited RequestStatus = "rate_limited" 283 StatusRateLimited RequestStatus = "rate_limited"
281 StatusInvalidRequest RequestStatus = "invalid_request" 284 StatusInvalidRequest RequestStatus = "invalid_request"
282) 285)
286
287func (m *Metrics) Serve(addr, path string) error {
288 mux := http.NewServeMux()
289 mux.Handle(path, promhttp.Handler())
290 return http.ListenAndServe(addr, mux)
291}