package main import ( "context" "flag" "log" "net" "net/http" "os" "os/signal" "syscall" "time" "connectrpc.com/connect" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "google.golang.org/grpc" pb "northwest.io/muxstr/api/nostr/v1" "northwest.io/muxstr/api/nostr/v1/nostrv1connect" "northwest.io/muxstr/internal/auth" "northwest.io/muxstr/internal/config" connecthandler "northwest.io/muxstr/internal/handler/connect" grpchandler "northwest.io/muxstr/internal/handler/grpc" wshandler "northwest.io/muxstr/internal/handler/websocket" "northwest.io/muxstr/internal/metrics" "northwest.io/muxstr/internal/ratelimit" "northwest.io/muxstr/internal/storage" "northwest.io/muxstr/internal/subscription" ) func main() { configFile := flag.String("config", "", "Path to config file (optional)") flag.Parse() cfg, err := config.Load(*configFile) if err != nil { log.Fatalf("failed to load config: %v", err) } store, err := storage.New(cfg.Database.Path) if err != nil { log.Fatalf("failed to create storage: %v", err) } defer store.Close() subManager := subscription.NewManager() grpcHandler := grpchandler.NewServer(store) grpcHandler.SetSubscriptionManager(subManager) connectHandler := connecthandler.NewHandler(grpcHandler) mux := http.NewServeMux() path, handler := nostrv1connect.NewNostrRelayHandler(connectHandler, connect.WithInterceptors()) mux.Handle(path, handler) var unaryInterceptors []grpc.UnaryServerInterceptor var streamInterceptors []grpc.StreamServerInterceptor var m *metrics.Metrics if cfg.Metrics.Enabled { m = metrics.New(cfg.Metrics.ToMetrics()) unaryInterceptors = append(unaryInterceptors, metrics.UnaryServerInterceptor(m)) streamInterceptors = append(streamInterceptors, metrics.StreamServerInterceptor(m)) mux.Handle("/metrics", m.PrometheusHandler()) mux.Handle("/dashboard", m.DashboardHandler()) } if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled { authOpts := &auth.InterceptorOptions{ Read: cfg.Auth.Read, Write: cfg.Auth.Write, TimestampWindow: cfg.Auth.TimestampWindow, SkipMethods: cfg.Auth.SkipMethods, } unaryInterceptors = append(unaryInterceptors, auth.NostrUnaryInterceptor(authOpts)) streamInterceptors = append(streamInterceptors, auth.NostrStreamInterceptor(authOpts)) } if cfg.RateLimit.Enabled { limiter := ratelimit.New(cfg.RateLimit.ToRateLimiter()) unaryInterceptors = append(unaryInterceptors, ratelimit.UnaryInterceptor(limiter)) streamInterceptors = append(streamInterceptors, ratelimit.StreamInterceptor(limiter)) } var serverOpts []grpc.ServerOption if len(unaryInterceptors) > 0 { serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryInterceptors...)) } if len(streamInterceptors) > 0 { serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamInterceptors...)) } wsHandler := wshandler.NewHandler(store, subManager) if m != nil { wsHandler.SetMetrics(m) // Update storage stats periodically go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() // Update immediately on start if stats, err := store.GetStats(context.Background()); err == nil { m.UpdateStorageStats(stats.EventCount, stats.DBSizeBytes) } for range ticker.C { if stats, err := store.GetStats(context.Background()); err == nil { m.UpdateStorageStats(stats.EventCount, stats.DBSizeBytes) } } }() } if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled { wsHandler.SetAuthConfig(&wshandler.AuthConfig{ ReadEnabled: cfg.Auth.Read.Enabled, WriteEnabled: cfg.Auth.Write.Enabled, ReadAllowedPubkeys: cfg.Auth.Read.AllowedNpubs, WriteAllowedPubkeys: cfg.Auth.Write.AllowedNpubs, }) } var grpcDisplay, httpDisplay, wsDisplay string if cfg.Server.PublicURL != "" { grpcDisplay = cfg.Server.PublicURL + ":443" httpDisplay = "https://" + cfg.Server.PublicURL wsDisplay = "wss://" + cfg.Server.PublicURL } else { grpcDisplay = cfg.Server.GrpcAddr httpDisplay = "http://" + cfg.Server.HttpAddr wsDisplay = "ws://" + cfg.Server.HttpAddr } wsHandler.SetIndexData(grpcDisplay, httpDisplay, wsDisplay) mux.Handle("/", wsHandler) grpcLis, err := net.Listen("tcp", cfg.Server.GrpcAddr) if err != nil { log.Fatalf("failed to listen on gRPC port: %v", err) } grpcServer := grpc.NewServer(serverOpts...) pb.RegisterNostrRelayServer(grpcServer, grpcHandler) httpServer := &http.Server{ Addr: cfg.Server.HttpAddr, Handler: h2c.NewHandler(mux, &http2.Server{}), ReadTimeout: cfg.Server.ReadTimeout, WriteTimeout: cfg.Server.WriteTimeout, } log.Printf("gRPC server listening on %s", cfg.Server.GrpcAddr) log.Printf("HTTP server listening on %s", cfg.Server.HttpAddr) log.Printf(" - Connect (gRPC-Web) at %s/nostr.v1.NostrRelay/*", cfg.Server.HttpAddr) log.Printf(" - WebSocket (Nostr) at %s/", cfg.Server.HttpAddr) if cfg.Metrics.Enabled { log.Printf(" - Metrics dashboard at %s/dashboard", cfg.Server.HttpAddr) log.Printf(" - Prometheus metrics at %s/metrics", cfg.Server.HttpAddr) } log.Printf("Database: %s", cfg.Database.Path) if cfg.Auth.Read.Enabled || cfg.Auth.Write.Enabled { log.Printf("Auth: enabled (read=%v write=%v)", cfg.Auth.Read.Enabled, cfg.Auth.Write.Enabled) } if cfg.RateLimit.Enabled { log.Printf("Rate limiting: enabled") } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) go func() { <-sigChan log.Println("Shutting down...") grpcServer.GracefulStop() httpServer.Shutdown(context.Background()) }() go func() { if err := grpcServer.Serve(grpcLis); err != nil { log.Fatalf("gRPC server failed: %v", err) } }() if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("WebSocket server failed: %v", err) } }