spegel/internal/registry/registry.go

278 lines
7.4 KiB
Go
Raw Normal View History

2023-01-24 15:47:27 +01:00
package registry
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"path"
2023-01-30 16:51:49 +01:00
"regexp"
2023-01-24 15:47:27 +01:00
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/go-logr/logr"
2023-05-16 09:10:41 +02:00
"github.com/opencontainers/go-digest"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
2023-01-24 15:47:27 +01:00
pkggin "github.com/xenitab/pkg/gin"
"github.com/xenitab/spegel/internal/header"
2023-05-19 14:50:37 +02:00
"github.com/xenitab/spegel/internal/oci"
2023-01-26 18:48:02 +01:00
"github.com/xenitab/spegel/internal/routing"
2023-01-24 15:47:27 +01:00
)
2023-03-10 15:03:15 +01:00
var mirrorRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "spegel_mirror_requests_total",
Help: "Total number of mirror requests.",
},
[]string{"registry", "cache", "source"},
)
2023-01-24 15:47:27 +01:00
type Registry struct {
ociClient oci.Client
router routing.Router
resolveRetries int
resolveTimeout time.Duration
}
func NewRegistry(ociClient oci.Client, router routing.Router, resolveRetries int, resolveTimeout time.Duration) *Registry {
return &Registry{
ociClient: ociClient,
router: router,
resolveRetries: resolveRetries,
resolveTimeout: resolveTimeout,
}
2023-01-24 15:47:27 +01:00
}
func (r *Registry) Server(addr string, log logr.Logger) *http.Server {
2023-01-30 16:51:49 +01:00
cfg := pkggin.Config{
LogConfig: pkggin.LogConfig{
Logger: log,
PathFilter: regexp.MustCompile("/healthz"),
IncludeLatency: true,
IncludeClientIP: true,
2023-02-06 10:56:49 +01:00
IncludeKeys: []string{"handler"},
2023-01-30 16:51:49 +01:00
},
MetricsConfig: pkggin.MetricsConfig{
HandlerID: "registry",
},
}
engine := pkggin.NewEngine(cfg)
engine.GET("/healthz", r.readyHandler)
engine.Any("/v2/*params", metricsHandler, r.registryHandler)
2023-01-24 15:47:27 +01:00
srv := &http.Server{
Addr: addr,
2023-01-30 16:51:49 +01:00
Handler: engine,
2023-01-24 15:47:27 +01:00
}
return srv
2023-01-24 15:47:27 +01:00
}
func (r *Registry) readyHandler(c *gin.Context) {
2023-01-24 22:52:13 +01:00
c.Status(http.StatusOK)
}
func (r *Registry) registryHandler(c *gin.Context) {
2023-01-24 15:47:27 +01:00
// Only deal with GET and HEAD requests.
if !(c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead) {
2023-01-24 15:47:27 +01:00
c.Status(http.StatusNotFound)
return
}
2023-01-24 16:40:11 +01:00
2023-01-24 15:47:27 +01:00
// Quickly return 200 for /v2/ to indicate that registry supports v2.
if path.Clean(c.Request.URL.Path) == "/v2" {
if c.Request.Method != http.MethodGet {
2023-01-24 15:47:27 +01:00
c.Status(http.StatusNotFound)
return
}
c.Status(http.StatusOK)
return
}
// Always expect remoteRegistry header to be passed in request.
remoteRegistry, err := header.GetRemoteRegistry(c.Request.Header)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
2023-07-03 17:53:36 +02:00
// Parse out path components from request.
2023-05-16 09:10:41 +02:00
ref, dgst, refType, err := oci.ParsePathComponents(remoteRegistry, c.Request.URL.Path)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
2023-05-16 09:10:41 +02:00
// Request with mirror header are proxied.
if header.IsMirrorRequest(c.Request.Header) {
2023-07-03 17:53:36 +02:00
key := dgst.String()
if key == "" {
key = ref
}
2023-05-16 09:10:41 +02:00
r.handleMirror(c, key)
2023-01-24 15:47:27 +01:00
return
}
2023-05-16 09:10:41 +02:00
// Serve registry endpoints.
if dgst == "" {
dgst, err = r.ociClient.Resolve(c, ref)
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusNotFound, err)
return
}
}
switch refType {
case oci.ReferenceTypeManifest:
r.handleManifest(c, dgst)
return
case oci.ReferenceTypeBlob:
r.handleBlob(c, dgst)
2023-01-24 15:47:27 +01:00
return
}
// If nothing matches return 404.
c.Status(http.StatusNotFound)
}
// TODO: Explore if it is worth returning early if router is not populated.
func (r *Registry) handleMirror(c *gin.Context, key string) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "mirror")
log := pkggin.FromContextOrDiscard(c)
2023-01-24 15:47:27 +01:00
// Disable mirroring so we dont end with an infinite loop
c.Request.Header[header.MirrorHeader] = []string{"false"}
2023-01-24 15:47:27 +01:00
// We should allow resolving to ourself if the mirror request is external.
isExternal := header.IsExternalRequest(c.Request.Header)
if isExternal {
log.Info("handling mirror request from external node", "path", c.Request.URL.Path, "ip", c.RemoteIP())
}
// Resolve mirror with the requested key
resolveCtx, cancel := context.WithTimeout(c, r.resolveTimeout)
defer cancel()
resolveCtx = logr.NewContext(resolveCtx, log)
mirrorCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
}
for {
select {
case <-resolveCtx.Done():
// Resolving mirror has timed out meaning one could not be found.
//nolint:errcheck // ignore
c.AbortWithError(http.StatusNotFound, fmt.Errorf("could not resolve mirror for key: %s", key))
return
case mirror, ok := <-mirrorCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("mirror resolution has been exhausted"))
return
}
// Modify response returns and error on non 200 status code and NOP error handler skips response writing.
// If proxy fails no response is written and it is tried again against a different mirror.
// If the response writer has been written to it means that the request was properly proxied.
succeeded := false
u, err := url.Parse(mirror)
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)
return
}
proxy := httputil.NewSingleHostReverseProxy(u)
proxy.ErrorHandler = func(http.ResponseWriter, *http.Request, error) {}
proxy.ModifyResponse = func(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status)
log.Error(err, "mirror failed attempting next")
return err
}
succeeded = true
return nil
}
proxy.ServeHTTP(c.Writer, c.Request)
if !succeeded {
break
}
log.V(5).Info("mirrored request", "path", c.Request.URL.Path, "url", u.String())
return
}
2023-01-24 15:47:27 +01:00
}
}
func (r *Registry) handleManifest(c *gin.Context, dgst digest.Digest) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "manifest")
2023-05-16 09:10:41 +02:00
b, mediaType, err := r.ociClient.GetBlob(c, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
c.Header("Content-Type", mediaType)
c.Header("Content-Length", strconv.FormatInt(int64(len(b)), 10))
2023-05-16 09:10:41 +02:00
c.Header("Docker-Content-Digest", dgst.String())
if c.Request.Method == http.MethodHead {
2023-01-24 15:47:27 +01:00
return
}
_, err = c.Writer.Write(b)
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-01-24 15:47:27 +01:00
c.AbortWithError(http.StatusNotFound, err)
return
}
}
func (r *Registry) handleBlob(c *gin.Context, dgst digest.Digest) {
2023-02-06 10:56:49 +01:00
c.Set("handler", "blob")
2023-05-16 09:10:41 +02:00
size, err := r.ociClient.GetSize(c, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-05-16 09:10:41 +02:00
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
return
}
2023-05-16 09:10:41 +02:00
c.Header("Content-Length", strconv.FormatInt(size, 10))
c.Header("Docker-Content-Digest", dgst.String())
if c.Request.Method == http.MethodHead {
2023-01-24 16:40:48 +01:00
return
}
2023-05-16 09:10:41 +02:00
err = r.ociClient.WriteBlob(c, c.Writer, dgst)
2023-01-24 15:47:27 +01:00
if err != nil {
2023-02-06 14:05:46 +01:00
//nolint:errcheck // ignore
2023-05-16 09:10:41 +02:00
c.AbortWithError(http.StatusInternalServerError, err)
2023-01-24 15:47:27 +01:00
return
}
}
2023-03-10 15:03:15 +01:00
func metricsHandler(c *gin.Context) {
c.Next()
handler, ok := c.Get("handler")
if !ok {
return
}
if handler != "mirror" {
return
}
remoteRegistry, err := header.GetRemoteRegistry(c.Request.Header)
2023-03-10 15:03:15 +01:00
if err != nil {
return
}
sourceType := "internal"
if header.IsExternalRequest(c.Request.Header) {
2023-03-10 15:03:15 +01:00
sourceType = "external"
}
cacheType := "hit"
if c.Writer.Status() != http.StatusOK {
cacheType = "miss"
}
mirrorRequestsTotal.WithLabelValues(remoteRegistry, cacheType, sourceType).Inc()
}