spegel/main.go
Lukasz Raczylo aa5d3190bc
Realign the structs in attempt to minimise memory usage.
Results:
```
spegel/internal/routing/bootstrap.go:20:29: struct with 88 pointer bytes could be 64
spegel/internal/routing/mock.go:8:17: struct with 32 pointer bytes could be 8
spegel/pkg/oci/containerd.go:371:15: struct with 24 pointer bytes could be 16
spegel/internal/registry/registry.go:38:15: struct with 64 pointer bytes could be 40
spegel/main.go:38:18: struct with 200 pointer bytes could be 168
spegel/internal/registry/registry_test.go:65:13: struct with 64 pointer bytes could be 48
spegel/pkg/oci/containerd_test.go:24:13: struct with 64 pointer bytes could be 56
spegel/pkg/oci/containerd_test.go:249:13: struct with 64 pointer bytes could be 56
spegel/pkg/oci/containerd_test.go:281:13: struct of size 96 could be 88
spegel/pkg/oci/image_test.go:12:13: struct with 80 pointer bytes could be 72
```
2023-12-29 11:46:32 +00:00

178 lines
6.2 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/alexflint/go-arg"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/afero"
pkgkubernetes "github.com/xenitab/pkg/kubernetes"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
"github.com/xenitab/spegel/internal/registry"
"github.com/xenitab/spegel/internal/routing"
"github.com/xenitab/spegel/internal/state"
"github.com/xenitab/spegel/pkg/oci"
)
type ConfigurationCmd struct {
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
MirrorRegistries []url.URL `arg:"--mirror-registries,required" help:"registries that are configured to act as mirrors."`
ResolveTags bool `arg:"--resolve-tags" default:"true" help:"When true Spegel will resolve tags to digests."`
}
type RegistryCmd struct {
KubeconfigPath string `arg:"--kubeconfig-path" help:"Path to the kubeconfig file."`
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
LocalAddr string `arg:"--local-addr,required" help:"Address that the local Spegel instance will be reached at."`
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
LeaderElectionName string `arg:"--leader-election-name" default:"spegel-leader-election" help:"Name of leader election."`
LeaderElectionNamespace string `arg:"--leader-election-namespace" default:"spegel" help:"Kubernetes namespace to write leader election data."`
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag" default:"true" help:"When true latest tags will be resolved to digests."`
}
type Arguments struct {
Configuration *ConfigurationCmd `arg:"subcommand:configuration"`
Registry *RegistryCmd `arg:"subcommand:registry"`
}
func main() {
args := &Arguments{}
arg.MustParse(args)
zapLog, err := zap.NewProduction()
if err != nil {
panic(fmt.Sprintf("who watches the watchmen (%v)?", err))
}
log := zapr.NewLogger(zapLog)
klog.SetLogger(log)
ctx := logr.NewContext(context.Background(), log)
err = run(ctx, args)
if err != nil {
log.Error(err, "")
os.Exit(1)
}
log.Info("gracefully shutdown")
}
func run(ctx context.Context, args *Arguments) error {
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()
switch {
case args.Configuration != nil:
return configurationCommand(ctx, args.Configuration)
case args.Registry != nil:
return registryCommand(ctx, args.Registry)
default:
return fmt.Errorf("unknown subcommand")
}
}
func configurationCommand(ctx context.Context, args *ConfigurationCmd) error {
fs := afero.NewOsFs()
err := oci.AddMirrorConfiguration(ctx, fs, args.ContainerdRegistryConfigPath, args.Registries, args.MirrorRegistries, args.ResolveTags)
if err != nil {
return err
}
return nil
}
func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
log := logr.FromContextOrDiscard(ctx)
g, ctx := errgroup.WithContext(ctx)
cs, err := pkgkubernetes.GetKubernetesClientset(args.KubeconfigPath)
if err != nil {
return err
}
ociClient, err := oci.NewContainerd(args.ContainerdSock, args.ContainerdNamespace, args.ContainerdRegistryConfigPath, args.Registries)
if err != nil {
return err
}
err = ociClient.Verify(ctx)
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: args.MetricsAddr,
Handler: mux,
}
g.Go(func() error {
if err := metricsSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
g.Go(func() error {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return metricsSrv.Shutdown(shutdownCtx)
})
_, registryPort, err := net.SplitHostPort(args.RegistryAddr)
if err != nil {
return err
}
bootstrapper := routing.NewKubernetesBootstrapper(cs, args.LeaderElectionNamespace, args.LeaderElectionName)
router, err := routing.NewP2PRouter(ctx, args.RouterAddr, bootstrapper, registryPort)
if err != nil {
return err
}
g.Go(func() error {
<-ctx.Done()
return router.Close()
})
g.Go(func() error {
state.Track(ctx, ociClient, router, args.ResolveLatestTag)
return nil
})
reg := registry.NewRegistry(ociClient, router, args.LocalAddr, args.MirrorResolveRetries, args.MirrorResolveTimeout, args.ResolveLatestTag)
regSrv := reg.Server(args.RegistryAddr, log)
g.Go(func() error {
if err := regSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
g.Go(func() error {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return regSrv.Shutdown(shutdownCtx)
})
log.Info("running registry", "addr", args.RegistryAddr)
err = g.Wait()
if err != nil {
return err
}
return nil
}