Compare commits
1 Commits
main
...
feature/oc
Author | SHA1 | Date | |
---|---|---|---|
|
c472acfa42 |
@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- [#877](https://github.com/spegel-org/spegel/pull/877) Add support for www authenticate header.
|
||||
- [#878](https://github.com/spegel-org/spegel/pull/878) Add dial timeout configuration in Containerd mirror configuration.
|
||||
- [#889](https://github.com/spegel-org/spegel/pull/889) Add support for content create events.
|
||||
- [#895](https://github.com/spegel-org/spegel/pull/895) Refactor mirroring to use OCI file.
|
||||
|
||||
### Changed
|
||||
|
||||
|
123
internal/ocifs/ocifs.go
Normal file
123
internal/ocifs/ocifs.go
Normal file
@ -0,0 +1,123 @@
|
||||
package ocifs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
|
||||
"github.com/spegel-org/spegel/pkg/oci"
|
||||
"github.com/spegel-org/spegel/pkg/routing"
|
||||
)
|
||||
|
||||
type OCIFile interface {
|
||||
io.ReadCloser
|
||||
|
||||
Descriptor() (ocispec.Descriptor, error)
|
||||
}
|
||||
|
||||
var _ OCIFile = &RoutedFile{}
|
||||
|
||||
type RoutedFile struct {
|
||||
fetch func() (*http.Response, error)
|
||||
}
|
||||
|
||||
func NewRoutedFile(ctx context.Context, router routing.Router, dist oci.DistributionPath, method string, forwardScheme string, timeout time.Duration, retries int) (*RoutedFile, error) {
|
||||
//nolint:bodyclose // Response body is only closed in Close().
|
||||
fetch := sync.OnceValues(func() (*http.Response, error) {
|
||||
resolveCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
peerCh, err := router.Resolve(resolveCtx, dist.Reference(), retries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u := dist.URL()
|
||||
u.Scheme = forwardScheme
|
||||
|
||||
mirrorAttempts := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("routing for OCI file has been cancelled: %w", ctx.Err())
|
||||
case peer, ok := <-peerCh:
|
||||
// Channel closed means no more mirrors will be received and max retries has been reached.
|
||||
if !ok {
|
||||
err = fmt.Errorf("mirror with OCI file %s could not be found", dist.Reference())
|
||||
if mirrorAttempts > 0 {
|
||||
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.Host = peer.String()
|
||||
resp, err := doRequest(ctx, http.DefaultClient, method, u)
|
||||
if err != nil {
|
||||
logr.FromContextOrDiscard(ctx).Error(err, "request to mirror failed", "attempt", mirrorAttempts, "path", u.Path, "mirror", peer)
|
||||
continue
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
})
|
||||
f := &RoutedFile{
|
||||
fetch: fetch,
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (f *RoutedFile) Read(p []byte) (n int, err error) {
|
||||
//nolint:bodyclose // Response body is only closed in Close().
|
||||
resp, err := f.fetch()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return resp.Body.Read(p)
|
||||
}
|
||||
|
||||
func (f *RoutedFile) Close() error {
|
||||
resp, err := f.fetch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, copyErr := io.Copy(io.Discard, resp.Body)
|
||||
closeErr := resp.Body.Close()
|
||||
return errors.Join(closeErr, copyErr)
|
||||
}
|
||||
|
||||
func (f *RoutedFile) Descriptor() (ocispec.Descriptor, error) {
|
||||
//nolint:bodyclose // Response body is only closed in Close().
|
||||
resp, err := f.fetch()
|
||||
if err != nil {
|
||||
return ocispec.Descriptor{}, err
|
||||
}
|
||||
desc, err := oci.DescriptorFromHeader(resp.Header)
|
||||
if err != nil {
|
||||
return ocispec.Descriptor{}, err
|
||||
}
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
func doRequest(ctx context.Context, client *http.Client, method string, u *url.URL) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, method, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("X-Spegel-Mirrored", "true")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//nolint:staticcheck // Ignore until replaced with status error.
|
||||
if !(resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent) {
|
||||
return nil, fmt.Errorf("expected status code %s but got %s", http.StatusText(http.StatusOK), resp.Status)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
@ -1,13 +1,10 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
@ -15,6 +12,7 @@ import (
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
|
||||
"github.com/spegel-org/spegel/internal/ocifs"
|
||||
"github.com/spegel-org/spegel/pkg/httpx"
|
||||
"github.com/spegel-org/spegel/pkg/metrics"
|
||||
"github.com/spegel-org/spegel/pkg/oci"
|
||||
@ -235,8 +233,6 @@ func (r *Registry) registryHandler(rw httpx.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
func (r *Registry) handleMirror(rw httpx.ResponseWriter, req *http.Request, dist oci.DistributionPath) {
|
||||
log := r.log.WithValues("ref", dist.Reference(), "path", req.URL.Path)
|
||||
|
||||
defer func() {
|
||||
cacheType := "hit"
|
||||
if rw.Status() != http.StatusOK {
|
||||
@ -251,44 +247,31 @@ func (r *Registry) handleMirror(rw httpx.ResponseWriter, req *http.Request, dist
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve mirror with the requested reference
|
||||
resolveCtx, cancel := context.WithTimeout(req.Context(), r.resolveTimeout)
|
||||
defer cancel()
|
||||
resolveCtx = logr.NewContext(resolveCtx, log)
|
||||
peerCh, err := r.router.Resolve(resolveCtx, dist.Reference(), r.resolveRetries)
|
||||
forwardScheme := "http"
|
||||
if req.TLS != nil {
|
||||
forwardScheme = "https"
|
||||
}
|
||||
f, err := ocifs.NewRoutedFile(logr.NewContext(req.Context(), r.log), r.router, dist, req.Method, forwardScheme, r.resolveTimeout, r.resolveRetries)
|
||||
if err != nil {
|
||||
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err))
|
||||
rw.WriteError(http.StatusNotFound, err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
mirrorAttempts := 0
|
||||
for {
|
||||
select {
|
||||
case <-req.Context().Done():
|
||||
// Request has been closed by server or client. No use continuing.
|
||||
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", dist.Reference(), resolveCtx.Err()))
|
||||
return
|
||||
case peer, ok := <-peerCh:
|
||||
// Channel closed means no more mirrors will be received and max retries has been reached.
|
||||
if !ok {
|
||||
err = fmt.Errorf("mirror with image component %s could not be found", dist.Reference())
|
||||
if mirrorAttempts > 0 {
|
||||
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
|
||||
}
|
||||
rw.WriteError(http.StatusNotFound, err)
|
||||
return
|
||||
}
|
||||
desc, err := f.Descriptor()
|
||||
if err != nil {
|
||||
rw.WriteError(http.StatusNotFound, err)
|
||||
return
|
||||
}
|
||||
oci.WriteDescriptorToHeader(desc, rw.Header())
|
||||
|
||||
mirrorAttempts++
|
||||
|
||||
err := forwardRequest(r.client, r.bufferPool, req, rw, peer)
|
||||
if err != nil {
|
||||
log.Error(err, "request to mirror failed", "attempt", mirrorAttempts, "path", req.URL.Path, "mirror", peer)
|
||||
continue
|
||||
}
|
||||
log.V(4).Info("mirrored request", "path", req.URL.Path, "mirror", peer)
|
||||
return
|
||||
}
|
||||
//nolint: errcheck // Ignore
|
||||
buf := r.bufferPool.Get().(*[]byte)
|
||||
defer r.bufferPool.Put(buf)
|
||||
_, err = io.CopyBuffer(rw, f, *buf)
|
||||
if err != nil {
|
||||
rw.WriteError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@ -342,58 +325,3 @@ func (r *Registry) handleBlob(rw httpx.ResponseWriter, req *http.Request, dist o
|
||||
|
||||
http.ServeContent(rw, req, "", time.Time{}, rc)
|
||||
}
|
||||
|
||||
func forwardRequest(client *http.Client, bufferPool *sync.Pool, req *http.Request, rw http.ResponseWriter, addrPort netip.AddrPort) error {
|
||||
// Do request to mirror.
|
||||
forwardScheme := "http"
|
||||
if req.TLS != nil {
|
||||
forwardScheme = "https"
|
||||
}
|
||||
u := &url.URL{
|
||||
Scheme: forwardScheme,
|
||||
Host: addrPort.String(),
|
||||
Path: req.URL.Path,
|
||||
RawQuery: req.URL.RawQuery,
|
||||
}
|
||||
forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method, u.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
copyHeader(forwardReq.Header, req.Header)
|
||||
forwardResp, err := client.Do(forwardReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer forwardResp.Body.Close()
|
||||
|
||||
// Clear body and try next if non 200 response.
|
||||
//nolint:staticcheck // Keep things readable.
|
||||
if !(forwardResp.StatusCode == http.StatusOK || forwardResp.StatusCode == http.StatusPartialContent) {
|
||||
_, err = io.Copy(io.Discard, forwardResp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("expected mirror to respond with 200 OK but received: %s", forwardResp.Status)
|
||||
}
|
||||
|
||||
// TODO (phillebaba): Is it possible to retry if copy fails half way through?
|
||||
// Copy forward response to response writer.
|
||||
copyHeader(rw.Header(), forwardResp.Header)
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
//nolint: errcheck // Ignore
|
||||
buf := bufferPool.Get().(*[]byte)
|
||||
defer bufferPool.Put(buf)
|
||||
_, err = io.CopyBuffer(rw, forwardResp.Body, *buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyHeader(dst, src http.Header) {
|
||||
for k, vv := range src {
|
||||
for _, v := range vv {
|
||||
dst.Add(k, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,12 +6,16 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
tlog "github.com/go-logr/logr/testing"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/spegel-org/spegel/pkg/httpx"
|
||||
"github.com/spegel-org/spegel/pkg/oci"
|
||||
"github.com/spegel-org/spegel/pkg/routing"
|
||||
)
|
||||
@ -143,7 +147,6 @@ func TestMirrorHandler(t *testing.T) {
|
||||
|
||||
badSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Header().Set("foo", "bar")
|
||||
if r.Method == http.MethodGet {
|
||||
//nolint:errcheck // ignore
|
||||
w.Write([]byte("hello world"))
|
||||
@ -154,10 +157,14 @@ func TestMirrorHandler(t *testing.T) {
|
||||
})
|
||||
badAddrPort := netip.MustParseAddrPort(badSvr.Listener.Addr().String())
|
||||
goodSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("foo", "bar")
|
||||
w.Header().Set(httpx.HeaderContentLength, "0")
|
||||
w.Header().Set(httpx.HeaderContentType, "application/octet-stream")
|
||||
w.Header().Set(oci.HeaderDockerDigest, ocispec.DescriptorEmptyJSON.Digest.String())
|
||||
if r.Method == http.MethodGet {
|
||||
b := []byte("hello world")
|
||||
w.Header().Set(httpx.HeaderContentLength, strconv.FormatInt(int64(len(b)), 10))
|
||||
//nolint:errcheck // ignore
|
||||
w.Write([]byte("hello world"))
|
||||
w.Write(b)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(func() {
|
||||
@ -177,50 +184,44 @@ func TestMirrorHandler(t *testing.T) {
|
||||
"sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8": {badAddrPort, badAddrPort, goodAddrPort},
|
||||
}
|
||||
router := routing.NewMemoryRouter(resolver, netip.AddrPort{})
|
||||
reg, err := NewRegistry(oci.NewMemory(), router)
|
||||
reg, err := NewRegistry(oci.NewMemory(), router, WithLogger(tlog.NewTestLogger(t)))
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
expectedHeaders map[string][]string
|
||||
name string
|
||||
key string
|
||||
expectedBody string
|
||||
expectedStatus int
|
||||
name string
|
||||
key string
|
||||
expectedBody string
|
||||
expectedStatus int
|
||||
}{
|
||||
{
|
||||
name: "request should timeout when no peers exists",
|
||||
key: "no-peers",
|
||||
expectedStatus: http.StatusNotFound,
|
||||
expectedBody: "",
|
||||
expectedHeaders: nil,
|
||||
name: "request should timeout when no peers exists",
|
||||
key: "no-peers",
|
||||
expectedStatus: http.StatusNotFound,
|
||||
expectedBody: "",
|
||||
},
|
||||
{
|
||||
name: "request should not timeout and give 404 if all peers fail",
|
||||
key: "sha256:c3e30fbcf3b231356a1efbd30a8ccec75134a7a8b45217ede97f4ff483540b04",
|
||||
expectedStatus: http.StatusNotFound,
|
||||
expectedBody: "",
|
||||
expectedHeaders: nil,
|
||||
name: "request should not timeout and give 404 if all peers fail",
|
||||
key: "sha256:c3e30fbcf3b231356a1efbd30a8ccec75134a7a8b45217ede97f4ff483540b04",
|
||||
expectedStatus: http.StatusNotFound,
|
||||
expectedBody: "",
|
||||
},
|
||||
{
|
||||
name: "request should work when first peer responds",
|
||||
key: "sha256:3b8a55c543ccc7ae01c47b1d35af5826a6439a9b91ab0ca96de9967759279896",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
expectedHeaders: map[string][]string{"foo": {"bar"}},
|
||||
name: "request should work when first peer responds",
|
||||
key: "sha256:3b8a55c543ccc7ae01c47b1d35af5826a6439a9b91ab0ca96de9967759279896",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
},
|
||||
{
|
||||
name: "second peer should respond when first gives error",
|
||||
key: "sha256:a0daab85ec30e2809a38c32fa676515aba22f481c56fda28637ae964ff398e3d",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
expectedHeaders: map[string][]string{"foo": {"bar"}},
|
||||
name: "second peer should respond when first gives error",
|
||||
key: "sha256:a0daab85ec30e2809a38c32fa676515aba22f481c56fda28637ae964ff398e3d",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
},
|
||||
{
|
||||
name: "last peer should respond when two first fail",
|
||||
key: "sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
expectedHeaders: map[string][]string{"foo": {"bar"}},
|
||||
name: "last peer should respond when two first fail",
|
||||
key: "sha256:11242d2a347bf8ab30b9f92d5ca219bbbedf95df5a8b74631194561497c1fae8",
|
||||
expectedStatus: http.StatusOK,
|
||||
expectedBody: "hello world",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@ -247,26 +248,7 @@ func TestMirrorHandler(t *testing.T) {
|
||||
if method == http.MethodHead {
|
||||
require.Empty(t, b)
|
||||
}
|
||||
|
||||
if tt.expectedHeaders == nil {
|
||||
require.Empty(t, resp.Header)
|
||||
}
|
||||
for k, v := range tt.expectedHeaders {
|
||||
require.Equal(t, v, resp.Header.Values(k))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopyHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
src := http.Header{
|
||||
"foo": []string{"2", "1"},
|
||||
}
|
||||
dst := http.Header{}
|
||||
copyHeader(dst, src)
|
||||
|
||||
require.Equal(t, []string{"2", "1"}, dst.Values("foo"))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user