299 lines
12 KiB
Go
299 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/alexflint/go-arg"
|
|
"github.com/go-logr/logr"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/klog/v2"
|
|
|
|
"github.com/spegel-org/spegel/internal/cleanup"
|
|
"github.com/spegel-org/spegel/internal/web"
|
|
"github.com/spegel-org/spegel/pkg/metrics"
|
|
"github.com/spegel-org/spegel/pkg/oci"
|
|
"github.com/spegel-org/spegel/pkg/registry"
|
|
"github.com/spegel-org/spegel/pkg/routing"
|
|
"github.com/spegel-org/spegel/pkg/state"
|
|
)
|
|
|
|
type ConfigurationCmd struct {
|
|
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
|
|
MirroredRegistries []url.URL `arg:"--mirrored-registries,env:MIRRORED_REGISTRIES" help:"Registries that are configured to be mirrored, if slice is empty all registires are mirrored."`
|
|
MirrorTargets []url.URL `arg:"--mirror-targets,env:MIRROR_TARGETS,required" help:"registries that are configured to act as mirrors."`
|
|
ResolveTags bool `arg:"--resolve-tags,env:RESOLVE_TAGS" default:"true" help:"When true Spegel will resolve tags to digests."`
|
|
PrependExisting bool `arg:"--prepend-existing,env:PREPEND_EXISTING" default:"false" help:"When true existing mirror configuration will be kept and Spegel will prepend it's configuration."`
|
|
}
|
|
|
|
type BootstrapConfig struct {
|
|
BootstrapKind string `arg:"--bootstrap-kind,env:BOOTSTRAP_KIND" help:"Kind of bootsrapper to use."`
|
|
DNSBootstrapDomain string `arg:"--dns-bootstrap-domain,env:DNS_BOOTSTRAP_DOMAIN" help:"Domain to use when bootstrapping using DNS."`
|
|
HTTPBootstrapAddr string `arg:"--http-bootstrap-addr,env:HTTP_BOOTSTRAP_ADDR" help:"Address to serve for HTTP bootstrap."`
|
|
HTTPBootstrapPeer string `arg:"--http-bootstrap-peer,env:HTTP_BOOTSTRAP_PEER" help:"Peer to HTTP bootstrap with."`
|
|
StaticBootstrapPeers []string `arg:"--static-bootstrap-peers,env:STATIC_BOOTSTRAP_PEERS" help:"Static list of peers to bootstrap with."`
|
|
}
|
|
|
|
type RegistryCmd struct {
|
|
BootstrapConfig
|
|
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
|
|
MetricsAddr string `arg:"--metrics-addr,env:METRICS_ADDR" default:":9090" help:"address to serve metrics."`
|
|
ContainerdSock string `arg:"--containerd-sock,env:CONTAINERD_SOCK" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
|
|
ContainerdNamespace string `arg:"--containerd-namespace,env:CONTAINERD_NAMESPACE" default:"k8s.io" help:"Containerd namespace to fetch images from."`
|
|
ContainerdContentPath string `arg:"--containerd-content-path,env:CONTAINERD_CONTENT_PATH" default:"/var/lib/containerd/io.containerd.content.v1.content" help:"Path to Containerd content store"`
|
|
DataDir string `arg:"--data-dir,env:DATA_DIR" default:"/var/lib/spegel" help:"Directory where Spegel persists data."`
|
|
RouterAddr string `arg:"--router-addr,env:ROUTER_ADDR" default:":5001" help:"address to serve router."`
|
|
RegistryAddr string `arg:"--registry-addr,env:REGISTRY_ADDR" default:":5000" help:"address to server image registry."`
|
|
MirroredRegistries []url.URL `arg:"--mirrored-registries,env:MIRRORED_REGISTRIES" help:"Registries that are configured to be mirrored, if slice is empty all registires are mirrored."`
|
|
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout,env:MIRROR_RESOLVE_TIMEOUT" default:"20ms" help:"Max duration spent finding a mirror."`
|
|
MirrorResolveRetries int `arg:"--mirror-resolve-retries,env:MIRROR_RESOLVE_RETRIES" default:"3" help:"Max amount of mirrors to attempt."`
|
|
ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."`
|
|
DebugWebEnabled bool `arg:"--debug-web-enabled,env:DEBUG_WEB_ENABLED" default:"false" help:"When true enables debug web page."`
|
|
}
|
|
|
|
type CleanupCmd struct {
|
|
Addr string `arg:"--addr,required,env:ADDR" help:"address to run readiness probe on."`
|
|
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
|
|
}
|
|
|
|
type CleanupWaitCmd struct {
|
|
ProbeEndpoint string `arg:"--probe-endpoint,required,env:PROBE_ENDPOINT" help:"endpoint to probe cleanup jobs from."`
|
|
Threshold int `arg:"--threshold,env:THRESHOLD" default:"3" help:"amount of consecutive successful probes to consider cleanup done."`
|
|
Period time.Duration `arg:"--period,env:PERIOD" default:"2s" help:"address to run readiness probe on."`
|
|
}
|
|
|
|
type Arguments struct {
|
|
Configuration *ConfigurationCmd `arg:"subcommand:configuration"`
|
|
Registry *RegistryCmd `arg:"subcommand:registry"`
|
|
Cleanup *CleanupCmd `arg:"subcommand:cleanup"`
|
|
CleanupWait *CleanupWaitCmd `arg:"subcommand:cleanup-wait"`
|
|
LogLevel slog.Level `arg:"--log-level,env:LOG_LEVEL" default:"INFO" help:"Minimum log level to output. Value should be DEBUG, INFO, WARN, or ERROR."`
|
|
}
|
|
|
|
func main() {
|
|
args := &Arguments{}
|
|
arg.MustParse(args)
|
|
|
|
opts := slog.HandlerOptions{
|
|
AddSource: true,
|
|
Level: args.LogLevel,
|
|
}
|
|
handler := slog.NewJSONHandler(os.Stderr, &opts)
|
|
log := logr.FromSlogHandler(handler)
|
|
klog.SetLogger(log)
|
|
ctx := logr.NewContext(context.Background(), log)
|
|
|
|
err := run(ctx, args)
|
|
if err != nil {
|
|
log.Error(err, "run exit with error")
|
|
os.Exit(1)
|
|
}
|
|
log.Info("gracefully shutdown")
|
|
}
|
|
|
|
func run(ctx context.Context, args *Arguments) error {
|
|
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
|
|
defer cancel()
|
|
switch {
|
|
case args.Configuration != nil:
|
|
return configurationCommand(ctx, args.Configuration)
|
|
case args.Registry != nil:
|
|
return registryCommand(ctx, args.Registry)
|
|
case args.Cleanup != nil:
|
|
return cleanupCommand(ctx, args.Cleanup)
|
|
case args.CleanupWait != nil:
|
|
return cleanupWaitCommand(ctx, args.CleanupWait)
|
|
default:
|
|
return errors.New("unknown subcommand")
|
|
}
|
|
}
|
|
|
|
func configurationCommand(ctx context.Context, args *ConfigurationCmd) error {
|
|
username, password, err := loadBasicAuth()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = oci.AddMirrorConfiguration(ctx, args.ContainerdRegistryConfigPath, args.MirroredRegistries, args.MirrorTargets, args.ResolveTags, args.PrependExisting, username, password)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
|
|
log := logr.FromContextOrDiscard(ctx)
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
|
|
username, password, err := loadBasicAuth()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// OCI Store
|
|
ociStore, err := oci.NewContainerd(args.ContainerdSock, args.ContainerdNamespace, args.ContainerdRegistryConfigPath, args.MirroredRegistries, oci.WithContentPath(args.ContainerdContentPath))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = ociStore.Verify(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Router
|
|
_, registryPort, err := net.SplitHostPort(args.RegistryAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bootstrapper, err := getBootstrapper(args.BootstrapConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
routerOpts := []routing.P2PRouterOption{
|
|
routing.WithDataDir(args.DataDir),
|
|
}
|
|
router, err := routing.NewP2PRouter(ctx, args.RouterAddr, bootstrapper, registryPort, routerOpts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
g.Go(func() error {
|
|
return router.Run(ctx)
|
|
})
|
|
|
|
// State tracking
|
|
g.Go(func() error {
|
|
err := state.Track(ctx, ociStore, router, args.ResolveLatestTag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Registry
|
|
registryOpts := []registry.RegistryOption{
|
|
registry.WithResolveLatestTag(args.ResolveLatestTag),
|
|
registry.WithResolveRetries(args.MirrorResolveRetries),
|
|
registry.WithResolveTimeout(args.MirrorResolveTimeout),
|
|
registry.WithLogger(log),
|
|
registry.WithBasicAuth(username, password),
|
|
}
|
|
reg, err := registry.NewRegistry(ociStore, router, registryOpts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
regSrv, err := reg.Server(args.RegistryAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
g.Go(func() error {
|
|
if err := regSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
g.Go(func() error {
|
|
<-ctx.Done()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
return regSrv.Shutdown(shutdownCtx)
|
|
})
|
|
|
|
// Metrics
|
|
metrics.Register()
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/metrics", promhttp.HandlerFor(metrics.DefaultGatherer, promhttp.HandlerOpts{}))
|
|
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
|
|
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
|
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
|
|
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
|
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
|
|
mux.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
|
|
mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
|
|
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
|
|
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
|
|
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
|
|
if args.DebugWebEnabled {
|
|
web, err := web.NewWeb(router)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mux.Handle("/debug/web/", web.Handler(log))
|
|
}
|
|
metricsSrv := &http.Server{
|
|
Addr: args.MetricsAddr,
|
|
Handler: mux,
|
|
}
|
|
g.Go(func() error {
|
|
if err := metricsSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
g.Go(func() error {
|
|
<-ctx.Done()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
return metricsSrv.Shutdown(shutdownCtx)
|
|
})
|
|
|
|
log.Info("running Spegel", "registry", args.RegistryAddr, "router", args.RouterAddr)
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func cleanupCommand(ctx context.Context, args *CleanupCmd) error {
|
|
err := cleanup.Run(ctx, args.Addr, args.ContainerdRegistryConfigPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func cleanupWaitCommand(ctx context.Context, args *CleanupWaitCmd) error {
|
|
err := cleanup.Wait(ctx, args.ProbeEndpoint, args.Period, args.Threshold)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getBootstrapper(cfg BootstrapConfig) (routing.Bootstrapper, error) { //nolint: ireturn // Return type can be different structs.
|
|
switch cfg.BootstrapKind {
|
|
case "dns":
|
|
return routing.NewDNSBootstrapper(cfg.DNSBootstrapDomain, 10), nil
|
|
case "http":
|
|
return routing.NewHTTPBootstrapper(cfg.HTTPBootstrapAddr, cfg.HTTPBootstrapPeer), nil
|
|
case "static":
|
|
return routing.NewStaticBootstrapperFromStrings(cfg.StaticBootstrapPeers)
|
|
default:
|
|
return nil, fmt.Errorf("unknown bootstrap kind %s", cfg.BootstrapKind)
|
|
}
|
|
}
|
|
|
|
func loadBasicAuth() (string, string, error) {
|
|
dirPath := "/etc/secrets/basic-auth"
|
|
username, err := os.ReadFile(filepath.Join(dirPath, "username"))
|
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
return "", "", err
|
|
}
|
|
password, err := os.ReadFile(filepath.Join(dirPath, "password"))
|
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
return "", "", err
|
|
}
|
|
return string(username), string(password), nil
|
|
}
|