Compare commits
1 Commits
main
...
feature/re
Author | SHA1 | Date | |
---|---|---|---|
|
548b6b1bdb |
@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
- [#436](https://github.com/spegel-org/spegel/pull/436) Replace http util reverese proxy with custom request forwarding.
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -32,7 +31,7 @@ type Registry struct {
|
|||||||
throttler *throttle.Throttler
|
throttler *throttle.Throttler
|
||||||
ociClient oci.Client
|
ociClient oci.Client
|
||||||
router routing.Router
|
router routing.Router
|
||||||
transport http.RoundTripper
|
httpClient *http.Client
|
||||||
localAddr string
|
localAddr string
|
||||||
resolveRetries int
|
resolveRetries int
|
||||||
resolveTimeout time.Duration
|
resolveTimeout time.Duration
|
||||||
@ -61,7 +60,7 @@ func WithResolveTimeout(resolveTimeout time.Duration) Option {
|
|||||||
|
|
||||||
func WithTransport(transport http.RoundTripper) Option {
|
func WithTransport(transport http.RoundTripper) Option {
|
||||||
return func(r *Registry) {
|
return func(r *Registry) {
|
||||||
r.transport = transport
|
r.httpClient.Transport = transport
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,6 +86,7 @@ func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *R
|
|||||||
r := &Registry{
|
r := &Registry{
|
||||||
ociClient: ociClient,
|
ociClient: ociClient,
|
||||||
router: router,
|
router: router,
|
||||||
|
httpClient: &http.Client{},
|
||||||
resolveRetries: 3,
|
resolveRetries: 3,
|
||||||
resolveTimeout: 1 * time.Second,
|
resolveTimeout: 1 * time.Second,
|
||||||
resolveLatestTag: true,
|
resolveLatestTag: true,
|
||||||
@ -184,10 +184,8 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request with mirror header are proxied.
|
// Requests without mirror header set will be mirrored
|
||||||
if req.Header.Get(MirroredHeaderKey) != "true" {
|
if req.Header.Get(MirroredHeaderKey) != "true" {
|
||||||
// Set mirrored header in request to stop infinite loops
|
|
||||||
req.Header.Set(MirroredHeaderKey, "true")
|
|
||||||
key := dgst.String()
|
key := dgst.String()
|
||||||
if key == "" {
|
if key == "" {
|
||||||
key = ref
|
key = ref
|
||||||
@ -241,6 +239,7 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, key st
|
|||||||
rw.WriteError(http.StatusInternalServerError, err)
|
rw.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Refactor context cancel and mirror channel closing
|
// TODO: Refactor context cancel and mirror channel closing
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -255,36 +254,43 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, key st
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Modify response returns and error on non 200 status code and NOP error handler skips response writing.
|
|
||||||
// If proxy fails no response is written and it is tried again against a different mirror.
|
|
||||||
// If the response writer has been written to it means that the request was properly proxied.
|
|
||||||
succeeded := false
|
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
if req.TLS != nil {
|
if req.TLS != nil {
|
||||||
scheme = "https"
|
scheme = "https"
|
||||||
}
|
}
|
||||||
u := &url.URL{
|
u := url.URL{
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
Host: ipAddr.String(),
|
Host: ipAddr.String(),
|
||||||
|
Path: req.URL.Path,
|
||||||
|
// TODO: Should this error early if not set?
|
||||||
|
RawQuery: fmt.Sprintf("ns=%s", req.URL.Query().Get("ns")),
|
||||||
}
|
}
|
||||||
proxy := httputil.NewSingleHostReverseProxy(u)
|
forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method, u.String(), nil)
|
||||||
proxy.Transport = r.transport
|
if err != nil {
|
||||||
proxy.ErrorHandler = func(_ http.ResponseWriter, _ *http.Request, err error) {
|
rw.WriteError(http.StatusInternalServerError, err)
|
||||||
log.Error(err, "proxy failed attempting next")
|
return
|
||||||
}
|
}
|
||||||
proxy.ModifyResponse = func(resp *http.Response) error {
|
forwardReq.Header.Add(MirroredHeaderKey, "true")
|
||||||
if resp.StatusCode != http.StatusOK {
|
resp, err := r.httpClient.Do(forwardReq)
|
||||||
err := fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status)
|
if err != nil {
|
||||||
log.Error(err, "mirror failed attempting next")
|
log.Error(err, "mirror failed attempting next")
|
||||||
return err
|
|
||||||
}
|
|
||||||
succeeded = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
proxy.ServeHTTP(rw, req)
|
|
||||||
if !succeeded {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.Error(fmt.Errorf("expected mirror to respond with 200 OK but received: %s", resp.Status), "mirror failed attempting next")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for k, v := range resp.Header {
|
||||||
|
for _, vv := range v {
|
||||||
|
rw.Header().Add(k, vv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = io.Copy(rw, resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
rw.WriteError(http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
log.V(5).Info("mirrored request", "url", u.String())
|
log.V(5).Info("mirrored request", "url", u.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user