Refactor mirroring to use OCI file

Signed-off-by: Philip Laine <philip.laine@gmail.com>
This commit is contained in:
Philip Laine 2025-05-22 14:35:17 +02:00
parent 7a2420f7d5
commit c472acfa42
No known key found for this signature in database
GPG Key ID: F6D0B743CA3EFF33
4 changed files with 180 additions and 146 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#877](https://github.com/spegel-org/spegel/pull/877) Add support for www authenticate header.
- [#878](https://github.com/spegel-org/spegel/pull/878) Add dial timeout configuration in Containerd mirror configuration.
- [#889](https://github.com/spegel-org/spegel/pull/889) Add support for content create events.
- [#895](https://github.com/spegel-org/spegel/pull/895) Refactor mirroring to use OCI file.
### Changed

123
internal/ocifs/ocifs.go Normal file
View File

@ -0,0 +1,123 @@
package ocifs
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
"github.com/go-logr/logr"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
)
type OCIFile interface {
io.ReadCloser
Descriptor() (ocispec.Descriptor, error)
}
var _ OCIFile = &RoutedFile{}
type RoutedFile struct {
fetch func() (*http.Response, error)
}
func NewRoutedFile(ctx context.Context, router routing.Router, dist oci.DistributionPath, method string, forwardScheme string, timeout time.Duration, retries int) (*RoutedFile, error) {
//nolint:bodyclose // Response body is only closed in Close().
fetch := sync.OnceValues(func() (*http.Response, error) {
resolveCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
peerCh, err := router.Resolve(resolveCtx, dist.Reference(), retries)
if err != nil {
return nil, err
}
u := dist.URL()
u.Scheme = forwardScheme
mirrorAttempts := 0
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("routing for OCI file has been cancelled: %w", ctx.Err())
case peer, ok := <-peerCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
err = fmt.Errorf("mirror with OCI file %s could not be found", dist.Reference())
if mirrorAttempts > 0 {
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
}
return nil, err
}
u.Host = peer.String()
resp, err := doRequest(ctx, http.DefaultClient, method, u)
if err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "request to mirror failed", "attempt", mirrorAttempts, "path", u.Path, "mirror", peer)
continue
}
return resp, nil
}
}
})
f := &RoutedFile{
fetch: fetch,
}
return f, nil
}
func (f *RoutedFile) Read(p []byte) (n int, err error) {
//nolint:bodyclose // Response body is only closed in Close().
resp, err := f.fetch()
if err != nil {
return 0, err
}
return resp.Body.Read(p)
}
func (f *RoutedFile) Close() error {
resp, err := f.fetch()
if err != nil {
return err
}
_, copyErr := io.Copy(io.Discard, resp.Body)
closeErr := resp.Body.Close()
return errors.Join(closeErr, copyErr)
}
func (f *RoutedFile) Descriptor() (ocispec.Descriptor, error) {
//nolint:bodyclose // Response body is only closed in Close().
resp, err := f.fetch()
if err != nil {
return ocispec.Descriptor{}, err
}
desc, err := oci.DescriptorFromHeader(resp.Header)
if err != nil {
return ocispec.Descriptor{}, err
}
return desc, nil
}
func doRequest(ctx context.Context, client *http.Client, method string, u *url.URL) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Spegel-Mirrored", "true")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
//nolint:staticcheck // Ignore until replaced with status error.
if !(resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent) {
return nil, fmt.Errorf("expected status code %s but got %s", http.StatusText(http.StatusOK), resp.Status)
}
return resp, nil
}

View File

@ -1,13 +1,10 @@
package registry
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/netip"
"net/url"
"path"
"strconv"
"sync"
@ -15,6 +12,7 @@ import (
"github.com/go-logr/logr"
"github.com/spegel-org/spegel/internal/ocifs"
"github.com/spegel-org/spegel/pkg/httpx"
"github.com/spegel-org/spegel/pkg/metrics"
"github.com/spegel-org/spegel/pkg/oci"
@ -235,8 +233,6 @@ func (r *Registry) registryHandler(rw httpx.ResponseWriter, req *http.Request) {
}
func (r *Registry) handleMirror(rw httpx.ResponseWriter, req *http.Request, dist oci.DistributionPath) {
log := r.log.WithValues("ref", dist.Reference(), "path", req.URL.Path)
defer func() {
cacheType := "hit"
if rw.Status() != http.StatusOK {
@ -251,44 +247,31 @@ func (r *Registry) handleMirror(rw httpx.ResponseWriter, req *http.Request, dist
return
}
// Resolve mirror with the requested reference
resolveCtx, cancel := context.WithTimeout(req.Context(), r.resolveTimeout)
defer cancel()
resolveCtx = logr.NewContext(resolveCtx, log)
peerCh, err := r.router.Resolve(resolveCtx, dist.Reference(), r.resolveRetries)
forwardScheme := "http"
if req.TLS != nil {
forwardScheme = "https"
}
f, err := ocifs.NewRoutedFile(logr.NewContext(req.Context(), r.log), r.router, dist, req.Method, forwardScheme, r.resolveTimeout, r.resolveRetries)
if err != nil {
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err))
rw.WriteError(http.StatusNotFound, err)
return
}
defer f.Close()
mirrorAttempts := 0
for {
select {
case <-req.Context().Done():
// Request has been closed by server or client. No use continuing.
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", dist.Reference(), resolveCtx.Err()))
return
case peer, ok := <-peerCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
err = fmt.Errorf("mirror with image component %s could not be found", dist.Reference())
if mirrorAttempts > 0 {
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
}
rw.WriteError(http.StatusNotFound, err)
return
}
desc, err := f.Descriptor()
if err != nil {
rw.WriteError(http.StatusNotFound, err)
return
}
oci.WriteDescriptorToHeader(desc, rw.Header())
mirrorAttempts++
err := forwardRequest(r.client, r.bufferPool, req, rw, peer)
if err != nil {
log.Error(err, "request to mirror failed", "attempt", mirrorAttempts, "path", req.URL.Path, "mirror", peer)
continue
}
log.V(4).Info("mirrored request", "path", req.URL.Path, "mirror", peer)
return
}
//nolint: errcheck // Ignore
buf := r.bufferPool.Get().(*[]byte)
defer r.bufferPool.Put(buf)
_, err = io.CopyBuffer(rw, f, *buf)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
}
@ -342,58 +325,3 @@ func (r *Registry) handleBlob(rw httpx.ResponseWriter, req *http.Request, dist o
http.ServeContent(rw, req, "", time.Time{}, rc)
}
func forwardRequest(client *http.Client, bufferPool *sync.Pool, req *http.Request, rw http.ResponseWriter, addrPort netip.AddrPort) error {
// Do request to mirror.
forwardScheme := "http"
if req.TLS != nil {
forwardScheme = "https"
}
u := &url.URL{
Scheme: forwardScheme,
Host: addrPort.String(),
Path: req.URL.Path,
RawQuery: req.URL.RawQuery,
}
forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method, u.String(), nil)
if err != nil {
return err
}
copyHeader(forwardReq.Header, req.Header)
forwardResp, err := client.Do(forwardReq)
if err != nil {
return err
}
defer forwardResp.Body.Close()
// Clear body and try next if non 200 response.
//nolint:staticcheck // Keep things readable.
if !(forwardResp.StatusCode == http.StatusOK || forwardResp.StatusCode == http.StatusPartialContent) {
_, err = io.Copy(io.Discard, forwardResp.Body)
if err != nil {
return err
}
return fmt.Errorf("expected mirror to respond with 200 OK but received: %s", forwardResp.Status)
}
// TODO (phillebaba): Is it possible to retry if copy fails half way through?
// Copy forward response to response writer.
copyHeader(rw.Header(), forwardResp.Header)
rw.WriteHeader(http.StatusOK)
//nolint: errcheck // Ignore
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)
_, err = io.CopyBuffer(rw, forwardResp.Body, *buf)
if err != nil {
return err
}
return nil
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}

View File

@ -6,12 +6,16 @@ import (
"net/http"
"net/http/httptest"
"net/netip"
"strconv"
"testing"
"time"
"github.com/go-logr/logr"
tlog "github.com/go-logr/logr/testing"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/require"
"github.com/spegel-org/spegel/pkg/httpx"
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
)
@ -143,7 +147,6 @@ func TestMirrorHandler(t *testing.T) {
badSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("foo", "bar")
if r.Method == http.MethodGet {
//nolint:errcheck // ignore
w.Write([]byte("hello world"))
@ -154,10 +157,14 @@ func TestMirrorHandler(t *testing.T) {
})
badAddrPort := netip.MustParseAddrPort(badSvr.Listener.Addr().String())
goodSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("foo", "bar")
w.Header().Set(httpx.HeaderContentLength, "0")
w.Header().Set(httpx.HeaderContentType, "application/octet-stream")
w.Header().Set(oci.HeaderDockerDigest, ocispec.DescriptorEmptyJSON.Digest.String())
if r.Method == http.MethodGet {
b := []byte("hello world")
w.Header().Set(httpx.HeaderContentLength, strconv.FormatInt(int64(len(b)), 10))
//nolint:errcheck // ignore
w.Write([]byte("hello world"))
w.Write(b)
}
}))
t.Cleanup(func() {
@ -177,50 +184,44 @@ func TestMirrorHandler(t *testing.T) {
"sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8": {badAddrPort, badAddrPort, goodAddrPort},
}
router := routing.NewMemoryRouter(resolver, netip.AddrPort{})
reg, err := NewRegistry(oci.NewMemory(), router)
reg, err := NewRegistry(oci.NewMemory(), router, WithLogger(tlog.NewTestLogger(t)))
require.NoError(t, err)
tests := []struct {
expectedHeaders map[string][]string
name string
key string
expectedBody string
expectedStatus int
name string
key string
expectedBody string
expectedStatus int
}{
{
name: "request should timeout when no peers exists",
key: "no-peers",
expectedStatus: http.StatusNotFound,
expectedBody: "",
expectedHeaders: nil,
name: "request should timeout when no peers exists",
key: "no-peers",
expectedStatus: http.StatusNotFound,
expectedBody: "",
},
{
name: "request should not timeout and give 404 if all peers fail",
key: "sha256:c3e30fbcf3b231356a1efbd30a8ccec75134a7a8b45217ede97f4ff483540b04",
expectedStatus: http.StatusNotFound,
expectedBody: "",
expectedHeaders: nil,
name: "request should not timeout and give 404 if all peers fail",
key: "sha256:c3e30fbcf3b231356a1efbd30a8ccec75134a7a8b45217ede97f4ff483540b04",
expectedStatus: http.StatusNotFound,
expectedBody: "",
},
{
name: "request should work when first peer responds",
key: "sha256:3b8a55c543ccc7ae01c47b1d35af5826a6439a9b91ab0ca96de9967759279896",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
expectedHeaders: map[string][]string{"foo": {"bar"}},
name: "request should work when first peer responds",
key: "sha256:3b8a55c543ccc7ae01c47b1d35af5826a6439a9b91ab0ca96de9967759279896",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
},
{
name: "second peer should respond when first gives error",
key: "sha256:a0daab85ec30e2809a38c32fa676515aba22f481c56fda28637ae964ff398e3d",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
expectedHeaders: map[string][]string{"foo": {"bar"}},
name: "second peer should respond when first gives error",
key: "sha256:a0daab85ec30e2809a38c32fa676515aba22f481c56fda28637ae964ff398e3d",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
},
{
name: "last peer should respond when two first fail",
key: "sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
expectedHeaders: map[string][]string{"foo": {"bar"}},
name: "last peer should respond when two first fail",
key: "sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8",
expectedStatus: http.StatusOK,
expectedBody: "hello world",
},
}
for _, tt := range tests {
@ -247,26 +248,7 @@ func TestMirrorHandler(t *testing.T) {
if method == http.MethodHead {
require.Empty(t, b)
}
if tt.expectedHeaders == nil {
require.Empty(t, resp.Header)
}
for k, v := range tt.expectedHeaders {
require.Equal(t, v, resp.Header.Values(k))
}
})
}
}
}
func TestCopyHeader(t *testing.T) {
t.Parallel()
src := http.Header{
"foo": []string{"2", "1"},
}
dst := http.Header{}
copyHeader(dst, src)
require.Equal(t, []string{"2", "1"}, dst.Values("foo"))
}