From ea4f508f5ee91b370c6912cde26b1a432380d037 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 14 Feb 2026 12:14:19 -0800 Subject: feat: integrate config system into relay main.go Add support for loading configuration from YAML file via -config flag. Wire up auth, rate limiting, and metrics interceptors based on config. Changes: - Add -config flag to relay command - Use config types directly in auth package (AuthOperationConfig) - Add conversion methods: RateLimitConfig.ToRateLimiter(), MetricsConfig.ToMetrics() - Add Metrics.Serve() method for prometheus HTTP endpoint - Update main.go to initialize interceptors from config - Fix type naming: OperationAuthConfig -> AuthOperationConfig for consistency Config now supports complete relay setup including auth read/write allowlists, rate limiting, and prometheus metrics. --- cmd/relay/main.go | 103 +++++++++++++++++++++++++++++++------------ internal/auth/README.md | 4 +- internal/auth/auth_test.go | 4 +- internal/auth/interceptor.go | 15 +++---- internal/config/config.go | 75 +++++++++++++++++++++++++------ internal/metrics/metrics.go | 9 ++++ 6 files changed, 155 insertions(+), 55 deletions(-) diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 3a1eeef..209d758 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "log" "net" @@ -9,8 +10,6 @@ import ( "os/signal" "syscall" - "context" - "connectrpc.com/connect" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -18,23 +17,27 @@ import ( pb "northwest.io/muxstr/api/nostr/v1" "northwest.io/muxstr/api/nostr/v1/nostrv1connect" + "northwest.io/muxstr/internal/auth" + "northwest.io/muxstr/internal/config" connecthandler "northwest.io/muxstr/internal/handler/connect" grpchandler "northwest.io/muxstr/internal/handler/grpc" wshandler "northwest.io/muxstr/internal/handler/websocket" + "northwest.io/muxstr/internal/metrics" + "northwest.io/muxstr/internal/ratelimit" "northwest.io/muxstr/internal/storage" "northwest.io/muxstr/internal/subscription" ) func main() { - var ( - grpcAddr = flag.String("grpc-addr", ":50051", "gRPC server address") - wsAddr = flag.String("ws-addr", ":8080", "WebSocket server address") - dbPath = flag.String("db", "relay.db", "SQLite database path") - publicURL = flag.String("public-url", "", "Public URL for relay (e.g., nostr-grpc.x.bdw.to)") - ) + configFile := flag.String("config", "", "Path to config file (optional)") flag.Parse() - store, err := storage.New(*dbPath) + cfg, err := config.Load(*configFile) + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + + store, err := storage.New(cfg.Database.Path) if err != nil { log.Fatalf("failed to create storage: %v", err) } @@ -51,42 +54,86 @@ func main() { path, handler := nostrv1connect.NewNostrRelayHandler(connectHandler, connect.WithInterceptors()) mux.Handle(path, handler) + var serverOpts []grpc.ServerOption + + if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled { + authOpts := &auth.InterceptorOptions{ + Read: cfg.Auth.Read, + Write: cfg.Auth.Write, + TimestampWindow: cfg.Auth.TimestampWindow, + SkipMethods: cfg.Auth.SkipMethods, + } + serverOpts = append(serverOpts, + grpc.UnaryInterceptor(auth.NostrUnaryInterceptor(authOpts)), + grpc.StreamInterceptor(auth.NostrStreamInterceptor(authOpts)), + ) + } + + if cfg.RateLimit.Enabled { + limiter := ratelimit.New(cfg.RateLimit.ToRateLimiter()) + serverOpts = append(serverOpts, + grpc.ChainUnaryInterceptor(ratelimit.UnaryInterceptor(limiter)), + grpc.ChainStreamInterceptor(ratelimit.StreamInterceptor(limiter)), + ) + } + + var m *metrics.Metrics + if cfg.Metrics.Enabled { + m = metrics.New(cfg.Metrics.ToMetrics()) + serverOpts = append(serverOpts, + grpc.ChainUnaryInterceptor(metrics.UnaryServerInterceptor(m)), + grpc.ChainStreamInterceptor(metrics.StreamServerInterceptor(m)), + ) + + go func() { + log.Printf("Metrics server listening on %s%s", cfg.Metrics.Addr, cfg.Metrics.Path) + if err := m.Serve(cfg.Metrics.Addr, cfg.Metrics.Path); err != nil { + log.Printf("Metrics server failed: %v", err) + } + }() + } + wsHandler := wshandler.NewHandler(store, subManager) - // Set public URLs for index page var grpcDisplay, httpDisplay, wsDisplay string - if *publicURL != "" { - // Use public URLs when behind reverse proxy (port 443) - grpcDisplay = *publicURL + ":443" - httpDisplay = "https://" + *publicURL - wsDisplay = "wss://" + *publicURL + if cfg.Server.PublicURL != "" { + grpcDisplay = cfg.Server.PublicURL + ":443" + httpDisplay = "https://" + cfg.Server.PublicURL + wsDisplay = "wss://" + cfg.Server.PublicURL } else { - // Use local addresses for development - grpcDisplay = *grpcAddr - httpDisplay = "http://" + *wsAddr - wsDisplay = "ws://" + *wsAddr + grpcDisplay = cfg.Server.GrpcAddr + httpDisplay = "http://" + cfg.Server.HttpAddr + wsDisplay = "ws://" + cfg.Server.HttpAddr } wsHandler.SetIndexData(grpcDisplay, httpDisplay, wsDisplay) mux.Handle("/", wsHandler) - grpcLis, err := net.Listen("tcp", *grpcAddr) + grpcLis, err := net.Listen("tcp", cfg.Server.GrpcAddr) if err != nil { log.Fatalf("failed to listen on gRPC port: %v", err) } - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer(serverOpts...) pb.RegisterNostrRelayServer(grpcServer, grpcHandler) httpServer := &http.Server{ - Addr: *wsAddr, - Handler: h2c.NewHandler(mux, &http2.Server{}), + Addr: cfg.Server.HttpAddr, + Handler: h2c.NewHandler(mux, &http2.Server{}), + ReadTimeout: cfg.Server.ReadTimeout, + WriteTimeout: cfg.Server.WriteTimeout, } - log.Printf("gRPC server listening on %s", *grpcAddr) - log.Printf("HTTP server listening on %s", *wsAddr) - log.Printf(" - Connect (gRPC-Web) at %s/nostr.v1.NostrRelay/*", *wsAddr) - log.Printf(" - WebSocket (Nostr) at %s/", *wsAddr) - log.Printf("Database: %s", *dbPath) + log.Printf("gRPC server listening on %s", cfg.Server.GrpcAddr) + log.Printf("HTTP server listening on %s", cfg.Server.HttpAddr) + log.Printf(" - Connect (gRPC-Web) at %s/nostr.v1.NostrRelay/*", cfg.Server.HttpAddr) + log.Printf(" - WebSocket (Nostr) at %s/", cfg.Server.HttpAddr) + log.Printf("Database: %s", cfg.Database.Path) + if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled { + log.Printf("Auth: enabled (read=%v write=%v)", cfg.Auth.Read.Enabled, cfg.Auth.Write.Enabled) + } + if cfg.RateLimit.Enabled { + log.Printf("Rate limiting: enabled") + } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/internal/auth/README.md b/internal/auth/README.md index de37010..98d1437 100644 --- a/internal/auth/README.md +++ b/internal/auth/README.md @@ -142,11 +142,11 @@ import ( // Create auth options authOpts := &auth.InterceptorOptions{ - Read: auth.OperationAuthConfig{ + Read: auth.AuthOperationConfig{ Enabled: true, // Require auth for reads AllowedNpubs: nil, // Accept any valid signature }, - Write: auth.OperationAuthConfig{ + Write: auth.AuthOperationConfig{ Enabled: true, AllowedNpubs: []string{"hex-pubkey-1", "hex-pubkey-2"}, // Whitelist }, diff --git a/internal/auth/auth_test.go b/internal/auth/auth_test.go index 7b0fa13..68c68f5 100644 --- a/internal/auth/auth_test.go +++ b/internal/auth/auth_test.go @@ -243,11 +243,11 @@ func TestValidateAuthFromContext(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), md) opts := &InterceptorOptions{ - Read: OperationAuthConfig{ + Read: AuthOperationConfig{ Enabled: true, AllowedNpubs: nil, }, - Write: OperationAuthConfig{ + Write: AuthOperationConfig{ Enabled: true, AllowedNpubs: nil, }, diff --git a/internal/auth/interceptor.go b/internal/auth/interceptor.go index 42c2688..67450ce 100644 --- a/internal/auth/interceptor.go +++ b/internal/auth/interceptor.go @@ -18,28 +18,25 @@ const ( ) type InterceptorOptions struct { - Read OperationAuthConfig - Write OperationAuthConfig + Read AuthOperationConfig + Write AuthOperationConfig TimestampWindow int64 ValidatePayload bool SkipMethods []string } -// OperationAuthConfig configures auth for read or write operations. -// Three states: disabled (allow all), enabled with empty list (require auth), -// enabled with npubs (whitelist only). Npubs normalized to hex at load time. -type OperationAuthConfig struct { +type AuthOperationConfig struct { Enabled bool AllowedNpubs []string } func DefaultInterceptorOptions() *InterceptorOptions { return &InterceptorOptions{ - Read: OperationAuthConfig{ + Read: AuthOperationConfig{ Enabled: false, AllowedNpubs: nil, }, - Write: OperationAuthConfig{ + Write: AuthOperationConfig{ Enabled: false, AllowedNpubs: nil, }, @@ -154,7 +151,7 @@ func validateAuthFromContext(ctx context.Context, method string, opts *Intercept pubkey := ExtractPubkey(event) - var opConfig OperationAuthConfig + var opConfig AuthOperationConfig if isWriteMethod(method) { opConfig = opts.Write } else { diff --git a/internal/config/config.go b/internal/config/config.go index 36c8be5..dcceade 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,9 @@ import ( "time" "gopkg.in/yaml.v3" + "northwest.io/muxstr/internal/auth" + "northwest.io/muxstr/internal/metrics" + "northwest.io/muxstr/internal/ratelimit" "northwest.io/nostr" ) @@ -33,18 +36,10 @@ type DatabaseConfig struct { } type AuthConfig struct { - Read AuthOperationConfig `yaml:"read"` - Write AuthOperationConfig `yaml:"write"` - TimestampWindow int64 `yaml:"timestamp_window"` - SkipMethods []string `yaml:"skip_methods"` -} - -// AuthOperationConfig configures auth for read or write operations. -// Three states: disabled (allow all), enabled with empty list (require auth), -// enabled with npubs (whitelist only). Npubs normalized to hex at load time. -type AuthOperationConfig struct { - Enabled bool `yaml:"enabled"` - AllowedNpubs []string `yaml:"allowed_npubs"` + Read auth.AuthOperationConfig `yaml:"read"` + Write auth.AuthOperationConfig `yaml:"write"` + TimestampWindow int64 `yaml:"timestamp_window"` + SkipMethods []string `yaml:"skip_methods"` } type RateLimitConfig struct { @@ -105,11 +100,11 @@ func Default() *Config { Path: "relay.db", }, Auth: AuthConfig{ - Read: AuthOperationConfig{ + Read: auth.AuthOperationConfig{ Enabled: false, AllowedNpubs: nil, }, - Write: AuthOperationConfig{ + Write: auth.AuthOperationConfig{ Enabled: false, AllowedNpubs: nil, }, @@ -349,3 +344,55 @@ func (c *Config) Save(filename string) error { return nil } + +func (r *RateLimitConfig) ToRateLimiter() *ratelimit.Config { + rlConfig := &ratelimit.Config{ + RequestsPerSecond: r.DefaultRPS, + BurstSize: r.DefaultBurst, + IPRequestsPerSecond: r.IPRPS, + IPBurstSize: r.IPBurst, + SkipMethods: r.SkipMethods, + SkipUsers: r.SkipUsers, + CleanupInterval: r.CleanupInterval, + MaxIdleTime: r.MaxIdleTime, + } + + if r.Methods != nil { + rlConfig.MethodLimits = make(map[string]ratelimit.MethodLimit, len(r.Methods)) + for method, limit := range r.Methods { + rlConfig.MethodLimits[method] = ratelimit.MethodLimit{ + RequestsPerSecond: limit.RPS, + BurstSize: limit.Burst, + } + } + } + + if r.Users != nil { + rlConfig.UserLimits = make(map[string]ratelimit.UserLimit, len(r.Users)) + for user, limit := range r.Users { + userLimit := ratelimit.UserLimit{ + RequestsPerSecond: limit.RPS, + BurstSize: limit.Burst, + } + if limit.Methods != nil { + userLimit.MethodLimits = make(map[string]ratelimit.MethodLimit, len(limit.Methods)) + for method, methodLimit := range limit.Methods { + userLimit.MethodLimits[method] = ratelimit.MethodLimit{ + RequestsPerSecond: methodLimit.RPS, + BurstSize: methodLimit.Burst, + } + } + } + rlConfig.UserLimits[user] = userLimit + } + } + + return rlConfig +} + +func (m *MetricsConfig) ToMetrics() *metrics.Config { + return &metrics.Config{ + Namespace: m.Namespace, + Subsystem: m.Subsystem, + } +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 3cb675f..9030d67 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,8 +1,11 @@ package metrics import ( + "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // Metrics holds all Prometheus metrics for the relay. @@ -280,3 +283,9 @@ const ( StatusRateLimited RequestStatus = "rate_limited" StatusInvalidRequest RequestStatus = "invalid_request" ) + +func (m *Metrics) Serve(addr, path string) error { + mux := http.NewServeMux() + mux.Handle(path, promhttp.Handler()) + return http.ListenAndServe(addr, mux) +} -- cgit v1.2.3