added nats-exec

This commit is contained in:
2026-02-21 12:56:58 +01:00
parent 7e6e18e9fa
commit 78d1e44add
5 changed files with 176 additions and 1 deletions

146
cmd/nats-exec/main.go Normal file
View File

@@ -0,0 +1,146 @@
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
"strings"
"syscall"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"golang.org/x/mod/semver"
)
func main() {
if len(os.Args) < 2 {
_, _ = fmt.Fprintf(os.Stderr, "Usage: %s <binary-name> [args...]\n", os.Args[0])
os.Exit(1)
}
binaryName := os.Args[1]
remainingArgs := os.Args[2:]
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
bucketName := getEnv("NATS_BUCKET", "binaries")
cacheDir := getEnv("NATS_CACHE_DIR", filepath.Join(os.TempDir(), "nats-exec-cache"))
ctx := context.Background()
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
store, err := js.ObjectStore(ctx, bucketName)
if err != nil {
log.Fatalf("Failed to get object store %s: %v", bucketName, err)
}
// Format: binary/arch/version
arch := runtime.GOARCH
objects, err := store.List(ctx)
if err != nil {
log.Fatalf("Failed to list objects: %v", err)
}
var latestVersion string
var latestKey string
for _, info := range objects {
// Key structure: <binaryName>/<arch>/<version>
parts := strings.Split(info.Name, "/")
if len(parts) != 3 {
continue
}
if parts[0] != binaryName {
continue
}
if parts[1] != arch {
continue
}
version := parts[2]
if !strings.HasPrefix(version, "v") {
version = "v" + version
}
if semver.IsValid(version) {
if latestVersion == "" || semver.Compare(version, latestVersion) > 0 {
latestVersion = version
latestKey = info.Name
}
}
}
if latestKey == "" {
log.Fatalf("No version of %s found for arch %s", binaryName, arch)
}
localPath := filepath.Join(cacheDir, latestKey)
if _, err := os.Stat(localPath); os.IsNotExist(err) {
log.Printf("Downloading %s version %s...", binaryName, latestVersion)
if err := downloadBinary(ctx, store, latestKey, localPath); err != nil {
log.Fatalf("Failed to download binary: %v", err)
}
} else {
log.Printf("Using cached %s version %s", binaryName, latestVersion)
}
if err := os.Chmod(localPath, 0755); err != nil {
log.Fatalf("Failed to make binary executable: %v", err)
}
fullPath, err := filepath.Abs(localPath)
if err != nil {
log.Fatalf("Failed to get absolute path: %v", err)
}
env := os.Environ()
args := append([]string{fullPath}, remainingArgs...)
err = syscall.Exec(fullPath, args, env)
if err != nil {
log.Fatalf("Failed to exec %s: %v", fullPath, err)
}
}
func downloadBinary(ctx context.Context, store jetstream.ObjectStore, key, localPath string) error {
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
return err
}
obj, err := store.Get(ctx, key)
if err != nil {
return err
}
f, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, obj)
return err
}
func getEnv(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}

238
cmd/nats-upload/main.go Normal file
View File

@@ -0,0 +1,238 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"golang.org/x/mod/semver"
)
func main() {
var (
natsURL = flag.String("nats", getEnv("INPUT_NATS_URL", "nats://localhost:4222"), "NATS server URL")
bucketName = flag.String("bucket", getEnv("INPUT_BUCKET", "binaries"), "Object store bucket name")
directory = flag.String("dir", getEnv("INPUT_SOURCE", "upload"), "Directory containing binaries to upload")
prefix = flag.String("prefix", getEnv("INPUT_STRIP_PREFIX", ""), "Prefix to strip from paths (like 'upload/')")
binaryName = flag.String("binary", getEnv("INPUT_BINARY", ""), "Binary name (defaults to first binary found)")
notifyTopic = flag.String("notify", getEnv("INPUT_NOTIFY_TOPIC", "binaries.update"), "NATS topic to publish update notification")
skipNotify = flag.Bool("skip-notify", getEnvBool("INPUT_SKIP_NOTIFY", false), "Skip publishing update notification")
cleanup = flag.Int("cleanup", getEnvInt("INPUT_CLEANUP", 0), "Keep only N most recent versions (0 disables cleanup)")
cleanupAll = flag.Bool("cleanup-all", getEnvBool("INPUT_CLEANUP_ALL", false), "Cleanup all binaries, not just current one")
)
flag.Parse()
if *directory == "" {
log.Fatal("Directory path is required")
}
ctx := context.Background()
nc, err := nats.Connect(*natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
store, err := js.ObjectStore(ctx, *bucketName)
if err != nil {
store, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: *bucketName,
Description: "Binary storage for self-update",
})
if err != nil {
log.Fatalf("Failed to get/create object store: %v", err)
}
log.Printf("Created object store: %s", *bucketName)
}
err = filepath.Walk(*directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read %s: %w", path, err)
}
relPath, err := filepath.Rel(*directory, path)
if err != nil {
return fmt.Errorf("failed to get relative path: %w", err)
}
objectKey := relPath
if *prefix != "" {
objectKey = strings.TrimPrefix(relPath, *prefix)
}
objectKey = filepath.ToSlash(objectKey)
if *binaryName == "" {
parts := strings.Split(objectKey, "/")
if len(parts) >= 2 {
*binaryName = parts[0]
}
}
log.Printf("Uploading %s as %s (%d bytes)", path, objectKey, len(data))
_, err = store.PutBytes(ctx, objectKey, data)
if err != nil {
return fmt.Errorf("failed to upload %s: %w", path, err)
}
log.Printf("✓ Uploaded %s", objectKey)
return nil
})
if err != nil {
log.Fatalf("Failed to upload files: %v", err)
}
log.Printf("Successfully uploaded all files from %s to NATS object store '%s'", *directory, *bucketName)
if *cleanup > 0 {
log.Printf("Cleaning up old versions, keeping %d most recent", *cleanup)
err = cleanupOldVersions(ctx, store, *binaryName, *cleanup, *cleanupAll)
if err != nil {
log.Fatalf("Failed to cleanup old versions: %v", err)
}
}
if !*skipNotify && *notifyTopic != "" {
log.Printf("Publishing update notification to topic: %s", *notifyTopic)
message := fmt.Sprintf("binaries updated in %s", *bucketName)
err = nc.Publish(*notifyTopic, []byte(message))
if err != nil {
log.Fatalf("Failed to publish notification: %v", err)
}
// Flush to ensure message is sent
err = nc.Flush()
if err != nil {
log.Fatalf("Failed to flush notification: %v", err)
}
log.Printf("✓ Published update notification")
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
func getEnvBool(key string, defaultValue bool) bool {
if value := os.Getenv(key); value != "" {
b, err := strconv.ParseBool(value)
if err == nil {
return b
}
}
return defaultValue
}
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
i, err := strconv.Atoi(value)
if err == nil {
return i
}
}
return defaultValue
}
func cleanupOldVersions(ctx context.Context, store jetstream.ObjectStore, currentBinary string, keepCount int, cleanAll bool) error {
objects, err := store.List(ctx)
if err != nil {
return fmt.Errorf("failed to list objects: %w", err)
}
// Group objects by binary/architecture path
// Expected structure: binary/arch/version
versionsByPath := make(map[string][]*jetstream.ObjectInfo)
for _, obj := range objects {
parts := strings.Split(obj.Name, "/")
if len(parts) < 3 {
// Not a version path, skip
continue
}
binaryName := parts[0]
arch := parts[1]
pathKey := binaryName + "/" + arch
// If not cleaning all and this isn't the current binary, skip
if !cleanAll && currentBinary != "" && binaryName != currentBinary {
continue
}
versionsByPath[pathKey] = append(versionsByPath[pathKey], obj)
}
// For each binary/arch combination, keep only the most recent N versions
for pathKey, versions := range versionsByPath {
if len(versions) <= keepCount {
log.Printf("Path %s has %d versions, keeping all", pathKey, len(versions))
continue
}
// Sort by semantic version (newest first)
sort.Slice(versions, func(i, j int) bool {
// Extract version from path: binary/arch/version
versionI := filepath.Base(versions[i].Name)
versionJ := filepath.Base(versions[j].Name)
// Ensure versions start with 'v' for semver.Compare
if !strings.HasPrefix(versionI, "v") {
versionI = "v" + versionI
}
if !strings.HasPrefix(versionJ, "v") {
versionJ = "v" + versionJ
}
// semver.Compare returns -1, 0, or 1
// We want newest first, so reverse the comparison
return semver.Compare(versionI, versionJ) > 0
})
// Delete old versions (everything after keepCount)
toDelete := versions[keepCount:]
log.Printf("Path %s has %d versions, deleting %d old versions", pathKey, len(versions), len(toDelete))
for _, obj := range toDelete {
version := filepath.Base(obj.Name)
log.Printf("Deleting old version: %s (version: %s)", obj.Name, version)
err := store.Delete(ctx, obj.Name)
if err != nil && !errors.Is(err, jetstream.ErrObjectNotFound) {
return fmt.Errorf("failed to delete %s: %w", obj.Name, err)
}
log.Printf("✓ Deleted %s", obj.Name)
}
}
return nil
}