package main import ( "context" "errors" "flag" "fmt" "log" "os" "path/filepath" "sort" "strconv" "strings" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "golang.org/x/mod/semver" ) func main() { var ( natsURL = flag.String("nats", getEnv("INPUT_NATS_URL", "nats://localhost:4222"), "NATS server URL") bucketName = flag.String("bucket", getEnv("INPUT_BUCKET", "binaries"), "Object store bucket name") directory = flag.String("dir", getEnv("INPUT_SOURCE", "upload"), "Directory containing binaries to upload") prefix = flag.String("prefix", getEnv("INPUT_STRIP_PREFIX", ""), "Prefix to strip from paths (like 'upload/')") binaryName = flag.String("binary", getEnv("INPUT_BINARY", ""), "Binary name (defaults to first binary found)") notifyTopic = flag.String("notify", getEnv("INPUT_NOTIFY_TOPIC", "binaries.update"), "NATS topic to publish update notification") skipNotify = flag.Bool("skip-notify", getEnvBool("INPUT_SKIP_NOTIFY", false), "Skip publishing update notification") cleanup = flag.Int("cleanup", getEnvInt("INPUT_CLEANUP", 0), "Keep only N most recent versions (0 disables cleanup)") cleanupAll = flag.Bool("cleanup-all", getEnvBool("INPUT_CLEANUP_ALL", false), "Cleanup all binaries, not just current one") justClean = flag.Bool("just-clean", getEnvBool("INPUT_JUST_CLEAN", false), "Dont upload, just cleanup old versions") ) flag.Parse() if *directory == "" && *cleanup == 0 && *justClean == false { log.Fatal("Directory path is required or cleanup must be enabled") } ctx := context.Background() nc, err := nats.Connect(*natsURL) if err != nil { log.Fatalf("Failed to connect to NATS: %v", err) } defer nc.Close() js, err := jetstream.New(nc) if err != nil { log.Fatalf("Failed to create JetStream context: %v", err) } store, err := js.ObjectStore(ctx, *bucketName) if err != nil { store, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ Bucket: *bucketName, Description: "Binary storage for self-update", }) if err != nil { log.Fatalf("Failed to get/create object store: %v", err) } log.Printf("Created object store: %s", *bucketName) } if *directory != "" && *justClean == false { err = filepath.Walk(*directory, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("failed to read %s: %w", path, err) } relPath, err := filepath.Rel(*directory, path) if err != nil { return fmt.Errorf("failed to get relative path: %w", err) } objectKey := relPath if *prefix != "" { objectKey = strings.TrimPrefix(relPath, *prefix) } objectKey = filepath.ToSlash(objectKey) if *binaryName == "" { parts := strings.Split(objectKey, "/") if len(parts) >= 2 { *binaryName = parts[0] } } log.Printf("Uploading %s as %s (%d bytes)", path, objectKey, len(data)) _, err = store.PutBytes(ctx, objectKey, data) if err != nil { return fmt.Errorf("failed to upload %s: %w", path, err) } log.Printf("✓ Uploaded %s", objectKey) return nil }) if err != nil { log.Fatalf("Failed to upload files: %v", err) } log.Printf("Successfully uploaded all files from %s to NATS object store '%s'", *directory, *bucketName) } if *cleanup > 0 { log.Printf("Cleaning up old versions, keeping %d most recent", *cleanup) err = cleanupOldVersions(ctx, store, *binaryName, *cleanup, *cleanupAll) if err != nil { log.Fatalf("Failed to cleanup old versions: %v", err) } } if !*skipNotify && *notifyTopic != "" { log.Printf("Publishing update notification to topic: %s", *notifyTopic) message := fmt.Sprintf("binaries updated in %s", *bucketName) err = nc.Publish(*notifyTopic, []byte(message)) if err != nil { log.Fatalf("Failed to publish notification: %v", err) } // Flush to ensure message is sent err = nc.Flush() if err != nil { log.Fatalf("Failed to flush notification: %v", err) } log.Printf("✓ Published update notification") } } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue } func getEnvBool(key string, defaultValue bool) bool { if value := os.Getenv(key); value != "" { b, err := strconv.ParseBool(value) if err == nil { return b } } return defaultValue } func getEnvInt(key string, defaultValue int) int { if value := os.Getenv(key); value != "" { i, err := strconv.Atoi(value) if err == nil { return i } } return defaultValue } func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, currentBinary string, keepCount int, cleanAll bool) error { objects, err := store.List(ctx) if err != nil { return fmt.Errorf("failed to list objects: %w", err) } // Group objects by binary/architecture path // Expected structure: binary/arch/version versionsByPath := make(map[string][]*jetstream.ObjectInfo) for _, obj := range objects { parts := strings.Split(obj.Name, "/") if len(parts) < 3 { // Not a version path, skip continue } binaryName := parts[0] arch := parts[1] pathKey := binaryName + "/" + arch // If not cleaning all and this isn't the current binary, skip if !cleanAll && currentBinary != "" && binaryName != currentBinary { continue } versionsByPath[pathKey] = append(versionsByPath[pathKey], obj) } // For each binary/arch combination, keep only the most recent N versions for pathKey, versions := range versionsByPath { if len(versions) <= keepCount { log.Printf("Path %s has %d versions, keeping all", pathKey, len(versions)) continue } // Sort by semantic version (newest first) sort.Slice(versions, func(i, j int) bool { // Extract version from path: binary/arch/version versionI := filepath.Base(versions[i].Name) versionJ := filepath.Base(versions[j].Name) // Ensure versions start with 'v' for semver.Compare if !strings.HasPrefix(versionI, "v") { versionI = "v" + versionI } if !strings.HasPrefix(versionJ, "v") { versionJ = "v" + versionJ } // semver.Compare returns -1, 0, or 1 // We want newest first, so reverse the comparison return semver.Compare(versionI, versionJ) > 0 }) // Delete old versions (everything after keepCount) toDelete := versions[keepCount:] log.Printf("Path %s has %d versions, deleting %d old versions", pathKey, len(versions), len(toDelete)) for _, obj := range toDelete { version := filepath.Base(obj.Name) log.Printf("Deleting old version: %s (version: %s)", obj.Name, version) err := store.Delete(ctx, obj.Name) if err != nil && !errors.Is(err, jetstream.ErrObjectNotFound) { return fmt.Errorf("failed to delete %s: %w", obj.Name, err) } log.Printf("✓ Deleted %s", obj.Name) } } return nil }