From a72bc68a52721d4ed9ace43d0989a528d09f0753 Mon Sep 17 00:00:00 2001 From: Ondrej Belusky Date: Wed, 25 Feb 2026 08:36:51 +0100 Subject: [PATCH] nats-upload: use cobra and add clean subcommand --- go.mod | 14 ++++ go.sum | 32 ++++++++ main.go | 241 +++++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 214 insertions(+), 73 deletions(-) diff --git a/go.mod b/go.mod index 5fda512..5705983 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,23 @@ require ( ) require ( + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.18.4 // indirect github.com/nats-io/nkeys v0.4.15 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/cobra v1.10.2 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/spf13/viper v1.21.0 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect ) diff --git a/go.sum b/go.sum index 02b5035..40b8a68 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,10 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE= @@ -6,9 +13,34 @@ github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= +github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index fa64808..4e12670 100644 --- a/main.go +++ b/main.go @@ -3,66 +3,162 @@ package main import ( "context" "errors" - "flag" "fmt" "log" "os" "path/filepath" "sort" - "strconv" "strings" + "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" "golang.org/x/mod/semver" ) -func main() { - var ( - natsURL = flag.String("nats", getEnv("INPUT_NATS_URL", "nats://localhost:4222"), "NATS server URL") - bucketName = flag.String("bucket", getEnv("INPUT_BUCKET", "binaries"), "Object store bucket name") - directory = flag.String("dir", getEnv("INPUT_SOURCE", "upload"), "Directory containing binaries to upload") - prefix = flag.String("prefix", getEnv("INPUT_STRIP_PREFIX", ""), "Prefix to strip from paths (like 'upload/')") - binaryName = flag.String("binary", getEnv("INPUT_BINARY", ""), "Binary name (defaults to first binary found)") - notifyTopic = flag.String("notify", getEnv("INPUT_NOTIFY_TOPIC", "binaries.update"), "NATS topic to publish update notification") - skipNotify = flag.Bool("skip-notify", getEnvBool("INPUT_SKIP_NOTIFY", false), "Skip publishing update notification") - cleanup = flag.Int("cleanup", getEnvInt("INPUT_CLEANUP", 0), "Keep only N most recent versions (0 disables cleanup)") - cleanupAll = flag.Bool("cleanup-all", getEnvBool("INPUT_CLEANUP_ALL", false), "Cleanup all binaries, not just current one") - justClean = flag.Bool("just-clean", getEnvBool("INPUT_JUST_CLEAN", false), "Dont upload, just cleanup old versions") - ) - flag.Parse() +type Config struct { + NatsURL string `mapstructure:"nats"` + BucketName string `mapstructure:"bucket"` + Directory string `mapstructure:"dir"` + Prefix string `mapstructure:"prefix"` + BinaryName string `mapstructure:"binary"` + NotifyTopic string `mapstructure:"notify"` + SkipNotify bool `mapstructure:"skip-notify"` + Cleanup int `mapstructure:"cleanup"` + CleanupAll bool `mapstructure:"cleanup-all"` +} - if *directory == "" && *cleanup == 0 && *justClean == false { - log.Fatal("Directory path is required or cleanup must be enabled") +var rootCmd = &cobra.Command{ + Use: "nats-upload", + Short: "Upload binaries to NATS object store and cleanup old versions", + RunE: func(cmd *cobra.Command, args []string) error { + var cfg Config + if err := viper.Unmarshal(&cfg); err != nil { + return fmt.Errorf("failed to unmarshal config: %w", err) + } + if cfg.Directory == "" && cfg.Cleanup == 0 { + return errors.New("directory path is required or cleanup must be enabled") + } + return runUploadAndCleanup(cmd.Context(), &cfg) + }, +} + +var cleanCmd = &cobra.Command{ + Use: "clean", + Short: "Cleanup old versions in NATS object store", + RunE: func(cmd *cobra.Command, args []string) error { + var cfg Config + if err := viper.Unmarshal(&cfg); err != nil { + return fmt.Errorf("failed to unmarshal config: %w", err) + } + if cfg.Cleanup == 0 { + return errors.New("cleanup count must be greater than 0") + } + return runCleanupOnly(cmd.Context(), &cfg) + }, +} + +func init() { + cobra.OnInitialize(initConfig) + + rootCmd.PersistentFlags().String("nats", "nats://localhost:4222", "NATS server URL") + rootCmd.PersistentFlags().String("bucket", "binaries", "Object store bucket name") + rootCmd.PersistentFlags().String("binary", "", "Binary name (defaults to first binary found)") + rootCmd.PersistentFlags().Int("cleanup", 2, "Keep only N most recent versions (0 disables cleanup)") + rootCmd.PersistentFlags().Bool("cleanup-all", false, "Cleanup all binaries, not just current one") + rootCmd.PersistentFlags().Bool("clean-all", false, "Alias for --cleanup-all") + + rootCmd.Flags().String("dir", "upload", "Directory containing binaries to upload") + rootCmd.Flags().String("prefix", "", "Prefix to strip from paths (like 'upload/')") + rootCmd.Flags().String("notify", "binaries.update", "NATS topic to publish update notification") + rootCmd.Flags().Bool("skip-notify", false, "Skip publishing update notification") +} + +func bindPFlag(fs *pflag.FlagSet, key string, flagNames ...string) { + name := key + if len(flagNames) > 0 { + name = flagNames[0] + } + if err := viper.BindPFlag(key, fs.Lookup(name)); err != nil { + log.Fatalf("error binding %s flag: %v", key, err) + } +} + +func init() { + rootPersistentFlags := rootCmd.PersistentFlags() + for _, name := range []string{"nats", "bucket", "binary", "cleanup", "cleanup-all"} { + bindPFlag(rootPersistentFlags, name) } - ctx := context.Background() + rootFlags := rootCmd.Flags() + for _, name := range []string{"dir", "prefix", "notify", "skip-notify"} { + bindPFlag(rootFlags, name) + } - nc, err := nats.Connect(*natsURL) + rootCmd.AddCommand(cleanCmd) +} + +func initConfig() { + viper.SetEnvPrefix("INPUT") + viper.AutomaticEnv() + + viper.RegisterAlias("nats_url", "nats") + viper.RegisterAlias("source", "dir") + viper.RegisterAlias("strip_prefix", "prefix") + viper.RegisterAlias("notify_topic", "notify") + viper.RegisterAlias("clean_all", "cleanup-all") +} + +type NATSClient struct { + Conn *nats.Conn + JS jetstream.JetStream + Store jetstream.ObjectStore +} + +func getNATSConnection(ctx context.Context, cfg *Config) (*NATSClient, error) { + nc, err := nats.Connect(cfg.NatsURL) if err != nil { - log.Fatalf("Failed to connect to NATS: %v", err) + return nil, fmt.Errorf("failed to connect to NATS: %w", err) } - defer nc.Close() js, err := jetstream.New(nc) if err != nil { - log.Fatalf("Failed to create JetStream context: %v", err) + nc.Close() + return nil, fmt.Errorf("failed to create JetStream context: %w", err) } - store, err := js.ObjectStore(ctx, *bucketName) + store, err := js.ObjectStore(ctx, cfg.BucketName) if err != nil { store, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ - Bucket: *bucketName, + Bucket: cfg.BucketName, Description: "Binary storage for self-update", }) if err != nil { - log.Fatalf("Failed to get/create object store: %v", err) + nc.Close() + return nil, fmt.Errorf("failed to get/create object store: %w", err) } - log.Printf("Created object store: %s", *bucketName) + log.Printf("Created object store: %s", cfg.BucketName) } - if *directory != "" && *justClean == false { - err = filepath.Walk(*directory, func(path string, info os.FileInfo, err error) error { + return &NATSClient{ + Conn: nc, + JS: js, + Store: store, + }, nil +} + +func runUploadAndCleanup(ctx context.Context, cfg *Config) error { + client, err := getNATSConnection(ctx, cfg) + if err != nil { + return err + } + defer client.Conn.Close() + + if cfg.Directory != "" { + err := filepath.Walk(cfg.Directory, func(path string, info os.FileInfo, err error) error { if err != nil { return err } @@ -76,28 +172,28 @@ func main() { return fmt.Errorf("failed to read %s: %w", path, err) } - relPath, err := filepath.Rel(*directory, path) + relPath, err := filepath.Rel(cfg.Directory, path) if err != nil { return fmt.Errorf("failed to get relative path: %w", err) } objectKey := relPath - if *prefix != "" { - objectKey = strings.TrimPrefix(relPath, *prefix) + if cfg.Prefix != "" { + objectKey = strings.TrimPrefix(relPath, cfg.Prefix) } objectKey = filepath.ToSlash(objectKey) - if *binaryName == "" { + if cfg.BinaryName == "" { parts := strings.Split(objectKey, "/") if len(parts) >= 2 { - *binaryName = parts[0] + cfg.BinaryName = parts[0] } } log.Printf("Uploading %s as %s (%d bytes)", path, objectKey, len(data)) - _, err = store.PutBytes(ctx, objectKey, data) + _, err = client.Store.PutBytes(ctx, objectKey, data) if err != nil { return fmt.Errorf("failed to upload %s: %w", path, err) } @@ -107,64 +203,53 @@ func main() { }) if err != nil { - log.Fatalf("Failed to upload files: %v", err) + return fmt.Errorf("failed to upload files: %w", err) } - log.Printf("Successfully uploaded all files from %s to NATS object store '%s'", *directory, *bucketName) + log.Printf("Successfully uploaded all files from %s to NATS object store '%s'", cfg.Directory, cfg.BucketName) } - if *cleanup > 0 { - log.Printf("Cleaning up old versions, keeping %d most recent", *cleanup) - err = cleanupOldVersions(ctx, store, *binaryName, *cleanup, *cleanupAll) + if cfg.Cleanup > 0 { + log.Printf("Cleaning up old versions, keeping %d most recent", cfg.Cleanup) + err := cleanupOldVersions(ctx, client.Store, cfg.BinaryName, cfg.Cleanup, cfg.CleanupAll) if err != nil { - log.Fatalf("Failed to cleanup old versions: %v", err) + return fmt.Errorf("failed to cleanup old versions: %w", err) } } - if !*skipNotify && *notifyTopic != "" { - log.Printf("Publishing update notification to topic: %s", *notifyTopic) + if !cfg.SkipNotify && cfg.NotifyTopic != "" { + log.Printf("Publishing update notification to topic: %s", cfg.NotifyTopic) - message := fmt.Sprintf("binaries updated in %s", *bucketName) - err = nc.Publish(*notifyTopic, []byte(message)) + message := fmt.Sprintf("binaries updated in %s", cfg.BucketName) + err := client.Conn.Publish(cfg.NotifyTopic, []byte(message)) if err != nil { - log.Fatalf("Failed to publish notification: %v", err) + return fmt.Errorf("failed to publish notification: %w", err) } // Flush to ensure message is sent - err = nc.Flush() + err = client.Conn.Flush() if err != nil { - log.Fatalf("Failed to flush notification: %v", err) + return fmt.Errorf("failed to flush notification: %w", err) } log.Printf("✓ Published update notification") } + return nil } -func getEnv(key, defaultValue string) string { - if value := os.Getenv(key); value != "" { - return value +func runCleanupOnly(ctx context.Context, cfg *Config) error { + client, err := getNATSConnection(ctx, cfg) + if err != nil { + return err } - return defaultValue -} + defer client.Conn.Close() -func getEnvBool(key string, defaultValue bool) bool { - if value := os.Getenv(key); value != "" { - b, err := strconv.ParseBool(value) - if err == nil { - return b - } + log.Printf("Cleaning up old versions, keeping %d most recent", cfg.Cleanup) + err = cleanupOldVersions(ctx, client.Store, cfg.BinaryName, cfg.Cleanup, cfg.CleanupAll) + if err != nil { + return fmt.Errorf("failed to cleanup old versions: %w", err) } - return defaultValue -} - -func getEnvInt(key string, defaultValue int) int { - if value := os.Getenv(key); value != "" { - i, err := strconv.Atoi(value) - if err == nil { - return i - } - } - return defaultValue + return nil } func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, currentBinary string, keepCount int, cleanAll bool) error { @@ -184,12 +269,12 @@ func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, curren continue } - binaryName := parts[0] + binName := parts[0] arch := parts[1] - pathKey := binaryName + "/" + arch + pathKey := binName + "/" + arch // If not cleaning all and this isn't the current binary, skip - if !cleanAll && currentBinary != "" && binaryName != currentBinary { + if !cleanAll && currentBinary != "" && binName != currentBinary { continue } @@ -239,3 +324,13 @@ func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, curren return nil } + +func main() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) + defer cancel() + + if err := rootCmd.ExecuteContext(ctx); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +}