spegel/internal/registry/registry.go

293 lines
7.6 KiB
Go
Raw Normal View History

2023-01-24 15:47:27 +01:00
package registry
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"path"
2023-01-30 16:51:49 +01:00
"regexp"
2023-01-24 15:47:27 +01:00
"strconv"
"strings"
2023-01-24 15:47:27 +01:00
"time"
"github.com/gin-gonic/gin"
"github.com/go-logr/logr"
2023-05-16 09:10:41 +02:00
"github.com/opencontainers/go-digest"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
2023-01-24 15:47:27 +01:00
pkggin "github.com/xenitab/pkg/gin"
2023-05-19 14:50:37 +02:00
"github.com/xenitab/spegel/internal/oci"
2023-01-26 18:48:02 +01:00
"github.com/xenitab/spegel/internal/routing"
2023-01-24 15:47:27 +01:00
)
2023-08-03 12:19:43 +02:00
const (
MirroredHeaderKey = "X-Spegel-Mirrored"
)
2023-03-10 15:03:15 +01:00
var mirrorRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "spegel_mirror_requests_total",
Help: "Total number of mirror requests.",
},
[]string{"registry", "cache", "source"},
)
2023-01-24 15:47:27 +01:00
type Registry struct {
ociClient oci.Client
router routing.Router
resolveRetries int
resolveTimeout time.Duration
resolveLatestTag bool
2023-08-03 13:46:33 +02:00
localAddr string
}
2023-08-03 13:46:33 +02:00
func NewRegistry(ociClient oci.Client, router routing.Router, localAddr string, resolveRetries int, resolveTimeout time.Duration, resolveLatestTag bool) *Registry {
return &Registry{
ociClient: ociClient,
router: router,
resolveRetries: resolveRetries,
resolveTimeout: resolveTimeout,
resolveLatestTag: resolveLatestTag,
2023-08-03 13:46:33 +02:00
localAddr: localAddr,
}
2023-01-24 15:47:27 +01:00
}
func (r *Registry) Server(addr string, log logr.Logger) *http.Server {
2023-01-30 16:51:49 +01:00
cfg := pkggin.Config{
LogConfig: pkggin.LogConfig{
Logger: log,
PathFilter: regexp.MustCompile("/healthz"),
IncludeLatency: true,
IncludeClientIP: true,
2023-02-06 10:56:49 +01:00
IncludeKeys: []string{"handler"},
2023-01-30 16:51:49 +01:00
},
MetricsConfig: pkggin.MetricsConfig{
HandlerID: "registry",
},
}
engine := pkggin.NewEngine(cfg)
engine.GET("/healthz", r.readyHandler)
2023-08-03 13:46:33 +02:00
engine.Any("/v2/*params", r.metricsHandler, r.registryHandler)
2023-01-24 15:47:27 +01:00
srv := &http.Server{
Addr: addr,
2023-01-30 16:51:49 +01:00
Handler: engine,
2023-01-24 15:47:27 +01:00
}
return srv
2023-01-24 15:47:27 +01:00
}
func (r *Registry) readyHandler(c *gin.Context) {
ok, err := r.router.HasMirrors()
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)
return
}
if !ok {
c.Status(http.StatusInternalServerError)
return
}
2023-01-24 22:52:13 +01:00
c.Status(http.StatusOK)
}
func (r *Registry) registryHandler(c *gin.Context) {
2023-01-24 15:47:27 +01:00
// Only deal with GET and HEAD requests.
if !(c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead) {
2023-01-24 15:47:27 +01:00
c.Status(http.StatusNotFound)
return
}
// Quickly return 200 for /v2/ to indicate that registry supports v2.
if path.Clean(c.Request.URL.Path) == "/v2" {
if c.Request.Method != http.MethodGet {
2023-01-24 15:47:27 +01:00
c.Status(http.StatusNotFound)
return
}
c.Status(http.StatusOK)
return
}
2023-07-03 17:53:36 +02:00
// Parse out path components from request.
2023-08-03 11:43:44 +02:00
ref, dgst, refType, err := oci.ParsePathComponents(c.Query("ns"), c.Request.URL.Path)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
2023-05-16 09:10:41 +02:00
if !r.resolveLatestTag && ref != "" {
_, tag, _ := strings.Cut(ref, ":")
if tag == "latest" {
c.AbortWithStatus(http.StatusNotFound)
return
}
}
2023-05-16 09:10:41 +02:00
// Request with mirror header are proxied.
2023-08-03 12:19:43 +02:00
if c.Request.Header.Get(MirroredHeaderKey) != "true" {
// Set mirrored header in request to stop infinite loops
c.Request.Header.Set(MirroredHeaderKey, "true")
2023-07-03 17:53:36 +02:00
key := dgst.String()
if key == "" {
key = ref
}
2023-05-16 09:10:41 +02:00
r.handleMirror(c, key)
2023-01-24 15:47:27 +01:00
return
}
2023-05-16 09:10:41 +02:00
// Serve registry endpoints.
if dgst == "" {
dgst, err = r.ociClient.Resolve(c, ref)
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusNotFound, err)
return
}
}
switch refType {
case oci.ReferenceTypeManifest:
r.handleManifest(c, dgst)
return
case oci.ReferenceTypeBlob:
r.handleBlob(c, dgst)
2023-01-24 15:47:27 +01:00
return
}
// If nothing matches return 404.
c.Status(http.StatusNotFound)
}
func (r *Registry) handleMirror(c *gin.Context, key string) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "mirror")
log := pkggin.FromContextOrDiscard(c)
// Resolve mirror with the requested key
resolveCtx, cancel := context.WithTimeout(c, r.resolveTimeout)
defer cancel()
resolveCtx = logr.NewContext(resolveCtx, log)
2023-08-03 13:46:33 +02:00
isExternal := r.isExternalRequest(c)
if isExternal {
log.Info("handling mirror request from external node", "path", c.Request.URL.Path, "ip", c.RemoteIP())
}
mirrorCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
}
for {
select {
case <-resolveCtx.Done():
// Resolving mirror has timed out meaning one could not be found.
//nolint:errcheck // ignore
c.AbortWithError(http.StatusNotFound, fmt.Errorf("could not resolve mirror for key: %s", key))
return
case mirror, ok := <-mirrorCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("mirror resolution has been exhausted"))
return
}
// Modify response returns and error on non 200 status code and NOP error handler skips response writing.
// If proxy fails no response is written and it is tried again against a different mirror.
// If the response writer has been written to it means that the request was properly proxied.
succeeded := false
u, err := url.Parse(mirror)
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)
return
}
proxy := httputil.NewSingleHostReverseProxy(u)
proxy.ErrorHandler = func(http.ResponseWriter, *http.Request, error) {}
proxy.ModifyResponse = func(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status)
log.Error(err, "mirror failed attempting next")
return err
}
succeeded = true
return nil
}
proxy.ServeHTTP(c.Writer, c.Request)
if !succeeded {
break
}
log.V(5).Info("mirrored request", "path", c.Request.URL.Path, "url", u.String())
return
}
2023-01-24 15:47:27 +01:00
}
}
func (r *Registry) handleManifest(c *gin.Context, dgst digest.Digest) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "manifest")
2023-05-16 09:10:41 +02:00
b, mediaType, err := r.ociClient.GetBlob(c, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
c.Header("Content-Type", mediaType)
c.Header("Content-Length", strconv.FormatInt(int64(len(b)), 10))
2023-05-16 09:10:41 +02:00
c.Header("Docker-Content-Digest", dgst.String())
if c.Request.Method == http.MethodHead {
2023-01-24 15:47:27 +01:00
return
}
_, err = c.Writer.Write(b)
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
}
func (r *Registry) handleBlob(c *gin.Context, dgst digest.Digest) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "blob")
2023-05-16 09:10:41 +02:00
size, err := r.ociClient.GetSize(c, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-05-16 09:10:41 +02:00
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
return
}
2023-05-16 09:10:41 +02:00
c.Header("Content-Length", strconv.FormatInt(size, 10))
c.Header("Docker-Content-Digest", dgst.String())
if c.Request.Method == http.MethodHead {
2023-01-24 16:40:48 +01:00
return
}
2023-05-16 09:10:41 +02:00
err = r.ociClient.WriteBlob(c, c.Writer, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-05-16 09:10:41 +02:00
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
return
}
}
2023-03-10 15:03:15 +01:00
2023-08-03 13:46:33 +02:00
func (r *Registry) metricsHandler(c *gin.Context) {
2023-03-10 15:03:15 +01:00
c.Next()
handler, ok := c.Get("handler")
if !ok {
return
}
if handler != "mirror" {
return
}
sourceType := "internal"
2023-08-03 13:46:33 +02:00
if r.isExternalRequest(c) {
2023-03-10 15:03:15 +01:00
sourceType = "external"
}
cacheType := "hit"
if c.Writer.Status() != http.StatusOK {
cacheType = "miss"
}
2023-08-03 11:43:44 +02:00
mirrorRequestsTotal.WithLabelValues(c.Query("ns"), cacheType, sourceType).Inc()
2023-03-10 15:03:15 +01:00
}
2023-08-03 13:46:33 +02:00
func (r *Registry) isExternalRequest(c *gin.Context) bool {
return c.Request.Host != r.localAddr
}