nats-exec: move out
This commit is contained in:
238
main.go
Normal file
238
main.go
Normal file
@@ -0,0 +1,238 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
"golang.org/x/mod/semver"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
natsURL = flag.String("nats", getEnv("INPUT_NATS_URL", "nats://localhost:4222"), "NATS server URL")
|
||||
bucketName = flag.String("bucket", getEnv("INPUT_BUCKET", "binaries"), "Object store bucket name")
|
||||
directory = flag.String("dir", getEnv("INPUT_SOURCE", "upload"), "Directory containing binaries to upload")
|
||||
prefix = flag.String("prefix", getEnv("INPUT_STRIP_PREFIX", ""), "Prefix to strip from paths (like 'upload/')")
|
||||
binaryName = flag.String("binary", getEnv("INPUT_BINARY", ""), "Binary name (defaults to first binary found)")
|
||||
notifyTopic = flag.String("notify", getEnv("INPUT_NOTIFY_TOPIC", "binaries.update"), "NATS topic to publish update notification")
|
||||
skipNotify = flag.Bool("skip-notify", getEnvBool("INPUT_SKIP_NOTIFY", false), "Skip publishing update notification")
|
||||
cleanup = flag.Int("cleanup", getEnvInt("INPUT_CLEANUP", 0), "Keep only N most recent versions (0 disables cleanup)")
|
||||
cleanupAll = flag.Bool("cleanup-all", getEnvBool("INPUT_CLEANUP_ALL", false), "Cleanup all binaries, not just current one")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
if *directory == "" {
|
||||
log.Fatal("Directory path is required")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
nc, err := nats.Connect(*natsURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create JetStream context: %v", err)
|
||||
}
|
||||
|
||||
store, err := js.ObjectStore(ctx, *bucketName)
|
||||
if err != nil {
|
||||
store, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
|
||||
Bucket: *bucketName,
|
||||
Description: "Binary storage for self-update",
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to get/create object store: %v", err)
|
||||
}
|
||||
log.Printf("Created object store: %s", *bucketName)
|
||||
}
|
||||
|
||||
err = filepath.Walk(*directory, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read %s: %w", path, err)
|
||||
}
|
||||
|
||||
relPath, err := filepath.Rel(*directory, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get relative path: %w", err)
|
||||
}
|
||||
|
||||
objectKey := relPath
|
||||
if *prefix != "" {
|
||||
objectKey = strings.TrimPrefix(relPath, *prefix)
|
||||
}
|
||||
|
||||
objectKey = filepath.ToSlash(objectKey)
|
||||
|
||||
if *binaryName == "" {
|
||||
parts := strings.Split(objectKey, "/")
|
||||
if len(parts) >= 2 {
|
||||
*binaryName = parts[0]
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Uploading %s as %s (%d bytes)", path, objectKey, len(data))
|
||||
|
||||
_, err = store.PutBytes(ctx, objectKey, data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload %s: %w", path, err)
|
||||
}
|
||||
|
||||
log.Printf("✓ Uploaded %s", objectKey)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to upload files: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Successfully uploaded all files from %s to NATS object store '%s'", *directory, *bucketName)
|
||||
|
||||
if *cleanup > 0 {
|
||||
log.Printf("Cleaning up old versions, keeping %d most recent", *cleanup)
|
||||
err = cleanupOldVersions(ctx, store, *binaryName, *cleanup, *cleanupAll)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to cleanup old versions: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !*skipNotify && *notifyTopic != "" {
|
||||
log.Printf("Publishing update notification to topic: %s", *notifyTopic)
|
||||
|
||||
message := fmt.Sprintf("binaries updated in %s", *bucketName)
|
||||
err = nc.Publish(*notifyTopic, []byte(message))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to publish notification: %v", err)
|
||||
}
|
||||
|
||||
// Flush to ensure message is sent
|
||||
err = nc.Flush()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to flush notification: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("✓ Published update notification")
|
||||
}
|
||||
}
|
||||
|
||||
func getEnv(key, defaultValue string) string {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
return value
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getEnvBool(key string, defaultValue bool) bool {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
b, err := strconv.ParseBool(value)
|
||||
if err == nil {
|
||||
return b
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getEnvInt(key string, defaultValue int) int {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
i, err := strconv.Atoi(value)
|
||||
if err == nil {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, currentBinary string, keepCount int, cleanAll bool) error {
|
||||
objects, err := store.List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list objects: %w", err)
|
||||
}
|
||||
|
||||
// Group objects by binary/architecture path
|
||||
// Expected structure: binary/arch/version
|
||||
versionsByPath := make(map[string][]*jetstream.ObjectInfo)
|
||||
|
||||
for _, obj := range objects {
|
||||
parts := strings.Split(obj.Name, "/")
|
||||
if len(parts) < 3 {
|
||||
// Not a version path, skip
|
||||
continue
|
||||
}
|
||||
|
||||
binaryName := parts[0]
|
||||
arch := parts[1]
|
||||
pathKey := binaryName + "/" + arch
|
||||
|
||||
// If not cleaning all and this isn't the current binary, skip
|
||||
if !cleanAll && currentBinary != "" && binaryName != currentBinary {
|
||||
continue
|
||||
}
|
||||
|
||||
versionsByPath[pathKey] = append(versionsByPath[pathKey], obj)
|
||||
}
|
||||
|
||||
// For each binary/arch combination, keep only the most recent N versions
|
||||
for pathKey, versions := range versionsByPath {
|
||||
if len(versions) <= keepCount {
|
||||
log.Printf("Path %s has %d versions, keeping all", pathKey, len(versions))
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort by semantic version (newest first)
|
||||
sort.Slice(versions, func(i, j int) bool {
|
||||
// Extract version from path: binary/arch/version
|
||||
versionI := filepath.Base(versions[i].Name)
|
||||
versionJ := filepath.Base(versions[j].Name)
|
||||
|
||||
// Ensure versions start with 'v' for semver.Compare
|
||||
if !strings.HasPrefix(versionI, "v") {
|
||||
versionI = "v" + versionI
|
||||
}
|
||||
if !strings.HasPrefix(versionJ, "v") {
|
||||
versionJ = "v" + versionJ
|
||||
}
|
||||
|
||||
// semver.Compare returns -1, 0, or 1
|
||||
// We want newest first, so reverse the comparison
|
||||
return semver.Compare(versionI, versionJ) > 0
|
||||
})
|
||||
|
||||
// Delete old versions (everything after keepCount)
|
||||
toDelete := versions[keepCount:]
|
||||
log.Printf("Path %s has %d versions, deleting %d old versions", pathKey, len(versions), len(toDelete))
|
||||
|
||||
for _, obj := range toDelete {
|
||||
version := filepath.Base(obj.Name)
|
||||
log.Printf("Deleting old version: %s (version: %s)", obj.Name, version)
|
||||
err := store.Delete(ctx, obj.Name)
|
||||
if err != nil && !errors.Is(err, jetstream.ErrObjectNotFound) {
|
||||
return fmt.Errorf("failed to delete %s: %w", obj.Name, err)
|
||||
}
|
||||
log.Printf("✓ Deleted %s", obj.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user