Compare commits

...

1 Commits

Author SHA1 Message Date
Philip Laine
548b6b1bdb Replace http util reverese proxy with custom request forwarding 2024-04-14 12:45:59 +02:00
2 changed files with 34 additions and 26 deletions

View File

@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- [#436](https://github.com/spegel-org/spegel/pull/436) Replace http util reverese proxy with custom request forwarding.
### Deprecated ### Deprecated
### Removed ### Removed

View File

@ -6,7 +6,6 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"net/http/httputil"
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
@ -32,7 +31,7 @@ type Registry struct {
throttler *throttle.Throttler throttler *throttle.Throttler
ociClient oci.Client ociClient oci.Client
router routing.Router router routing.Router
transport http.RoundTripper httpClient *http.Client
localAddr string localAddr string
resolveRetries int resolveRetries int
resolveTimeout time.Duration resolveTimeout time.Duration
@ -61,7 +60,7 @@ func WithResolveTimeout(resolveTimeout time.Duration) Option {
func WithTransport(transport http.RoundTripper) Option { func WithTransport(transport http.RoundTripper) Option {
return func(r *Registry) { return func(r *Registry) {
r.transport = transport r.httpClient.Transport = transport
} }
} }
@ -87,6 +86,7 @@ func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *R
r := &Registry{ r := &Registry{
ociClient: ociClient, ociClient: ociClient,
router: router, router: router,
httpClient: &http.Client{},
resolveRetries: 3, resolveRetries: 3,
resolveTimeout: 1 * time.Second, resolveTimeout: 1 * time.Second,
resolveLatestTag: true, resolveLatestTag: true,
@ -184,10 +184,8 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) {
} }
} }
// Request with mirror header are proxied. // Requests without mirror header set will be mirrored
if req.Header.Get(MirroredHeaderKey) != "true" { if req.Header.Get(MirroredHeaderKey) != "true" {
// Set mirrored header in request to stop infinite loops
req.Header.Set(MirroredHeaderKey, "true")
key := dgst.String() key := dgst.String()
if key == "" { if key == "" {
key = ref key = ref
@ -241,6 +239,7 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, key st
rw.WriteError(http.StatusInternalServerError, err) rw.WriteError(http.StatusInternalServerError, err)
return return
} }
// TODO: Refactor context cancel and mirror channel closing // TODO: Refactor context cancel and mirror channel closing
for { for {
select { select {
@ -255,36 +254,43 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, key st
return return
} }
// Modify response returns and error on non 200 status code and NOP error handler skips response writing.
// If proxy fails no response is written and it is tried again against a different mirror.
// If the response writer has been written to it means that the request was properly proxied.
succeeded := false
scheme := "http" scheme := "http"
if req.TLS != nil { if req.TLS != nil {
scheme = "https" scheme = "https"
} }
u := &url.URL{ u := url.URL{
Scheme: scheme, Scheme: scheme,
Host: ipAddr.String(), Host: ipAddr.String(),
Path: req.URL.Path,
// TODO: Should this error early if not set?
RawQuery: fmt.Sprintf("ns=%s", req.URL.Query().Get("ns")),
} }
proxy := httputil.NewSingleHostReverseProxy(u) forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method, u.String(), nil)
proxy.Transport = r.transport if err != nil {
proxy.ErrorHandler = func(_ http.ResponseWriter, _ *http.Request, err error) { rw.WriteError(http.StatusInternalServerError, err)
log.Error(err, "proxy failed attempting next") return
} }
proxy.ModifyResponse = func(resp *http.Response) error { forwardReq.Header.Add(MirroredHeaderKey, "true")
if resp.StatusCode != http.StatusOK { resp, err := r.httpClient.Do(forwardReq)
err := fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status) if err != nil {
log.Error(err, "mirror failed attempting next") log.Error(err, "mirror failed attempting next")
return err
}
succeeded = true
return nil
}
proxy.ServeHTTP(rw, req)
if !succeeded {
break break
} }
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Error(fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status), "mirror failed attempting next")
break
}
for k, v := range resp.Header {
for _, vv := range v {
rw.Header().Add(k, vv)
}
}
_, err = io.Copy(rw, resp.Body)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
log.V(5).Info("mirrored request", "url", u.String()) log.V(5).Info("mirrored request", "url", u.String())
return return
} }