Compare commits

...

2 Commits

Author SHA1 Message Date
Anders Qvist
bb68b64aa1
Use a single rate limiter for all requests. 2024-02-21 18:15:31 +01:00
Philip Laine
a5c16a73d2 Add support for io throttling 2024-02-21 11:32:39 +01:00
11 changed files with 220 additions and 13 deletions

View File

@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#335](https://github.com/XenitAB/spegel/pull/335) Add k3s to compatibility guide.
- [#339](https://github.com/XenitAB/spegel/pull/339) Extend OCI client tests.
- [#339](https://github.com/XenitAB/spegel/pull/339) Extend OCI client tests.
- [#365](https://github.com/XenitAB/spegel/pull/365) Add support for throttling blob write speed.
### Changed

View File

@ -83,6 +83,7 @@ spec:
| serviceMonitor.labels | object | `{}` | Service monitor specific labels for prometheus to discover servicemonitor. |
| serviceMonitor.scrapeTimeout | string | `"30s"` | Prometheus scrape interval timeout. |
| spegel.additionalMirrorRegistries | list | `[]` | Additional target mirror registries other than Spegel. |
| spegel.blobSpeed | string | `""` | Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps. |
| spegel.containerdMirrorAdd | bool | `true` | If true Spegel will add mirror configuration to the node. |
| spegel.containerdNamespace | string | `"k8s.io"` | Containerd namespace where images are stored. |
| spegel.containerdRegistryConfigPath | string | `"/etc/containerd/certs.d"` | Path to Containerd mirror configuration. |

View File

@ -92,6 +92,9 @@ spec:
- --leader-election-name={{ .Release.Name }}-leader-election
- --resolve-latest-tag={{ .Values.spegel.resolveLatestTag }}
- --local-addr=$(NODE_IP):{{ .Values.service.registry.hostPort }}
{{- with .Values.spegel.blobSpeed }}
- --blob-speed={{ . }}
{{- end }}
env:
- name: NODE_IP
valueFrom:

View File

@ -139,3 +139,5 @@ spegel:
resolveTags: true
# -- When true latest tags will be resolved to digests.
resolveLatestTag: true
# -- Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps.
blobSpeed: ""

2
go.mod
View File

@ -28,6 +28,7 @@ require (
go.etcd.io/bbolt v1.3.7
go.uber.org/zap v1.26.0
golang.org/x/sync v0.6.0
golang.org/x/time v0.5.0
k8s.io/client-go v0.27.4
k8s.io/cri-api v0.27.4
k8s.io/klog/v2 v2.90.1
@ -197,7 +198,6 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect

27
main.go
View File

@ -27,6 +27,7 @@ import (
"github.com/xenitab/spegel/pkg/registry"
"github.com/xenitab/spegel/pkg/routing"
"github.com/xenitab/spegel/pkg/state"
"github.com/xenitab/spegel/pkg/throttle"
)
type ConfigurationCmd struct {
@ -47,17 +48,18 @@ type BootstrapConfig struct {
type RegistryCmd struct {
BootstrapConfig
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
LocalAddr string `arg:"--local-addr,required" help:"Address that the local Spegel instance will be reached at."`
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag" default:"true" help:"When true latest tags will be resolved to digests."`
BlobSpeed *throttle.Byterate `arg:"--blob-speed" help:"Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps."`
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
LocalAddr string `arg:"--local-addr,required" help:"Address that the local Spegel instance will be reached at."`
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag" default:"true" help:"When true latest tags will be resolved to digests."`
}
type Arguments struct {
@ -176,6 +178,9 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
registry.WithResolveTimeout(args.MirrorResolveTimeout),
registry.WithLocalAddress(args.LocalAddr),
}
if args.BlobSpeed != nil {
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))
}
reg := registry.NewRegistry(ociClient, router, registryOpts...)
regSrv := reg.Server(args.RegistryAddr, log)
g.Go(func() error {

View File

@ -3,6 +3,7 @@ package registry
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
@ -16,14 +17,17 @@ import (
"github.com/go-logr/logr"
"github.com/opencontainers/go-digest"
pkggin "github.com/xenitab/pkg/gin"
"golang.org/x/time/rate"
"github.com/xenitab/spegel/pkg/metrics"
"github.com/xenitab/spegel/pkg/oci"
"github.com/xenitab/spegel/pkg/routing"
"github.com/xenitab/spegel/pkg/throttle"
)
const (
MirroredHeaderKey = "X-Spegel-Mirrored"
burstLimit = 1024 * 1024 * 1024 // 1GB
)
type Option func(*Registry)
@ -58,7 +62,16 @@ func WithLocalAddress(localAddr string) Option {
}
}
func WithBlobSpeed(blobSpeed throttle.Byterate) Option {
return func(r *Registry) {
limiter := rate.NewLimiter(rate.Limit(blobSpeed), burstLimit)
limiter.AllowN(time.Now(), burstLimit)
r.rateLimiter = limiter
}
}
type Registry struct {
rateLimiter *rate.Limiter
ociClient oci.Client
router routing.Router
transport http.RoundTripper
@ -294,7 +307,11 @@ func (r *Registry) handleBlob(c *gin.Context, dgst digest.Digest) {
if c.Request.Method == http.MethodHead {
return
}
err = r.ociClient.CopyLayer(c, dgst, c.Writer)
var writer io.Writer = c.Writer
if r.rateLimiter != nil {
writer = throttle.NewWriter(c.Writer, r.rateLimiter)
}
err = r.ociClient.CopyLayer(c, dgst, writer)
if err != nil {
//nolint:errcheck // ignore
c.AbortWithError(http.StatusInternalServerError, err)

48
pkg/throttle/byterate.go Normal file
View File

@ -0,0 +1,48 @@
package throttle
import (
"fmt"
"regexp"
"strconv"
)
var unmarshalRegex = regexp.MustCompile(`^(\d+)\s?([KMGT]?Bps)$`)
type Byterate int
const (
Bps Byterate = 1
KBps = 1024 * Bps
MBps = 1024 * KBps
GBps = 1024 * MBps
TBps = 1024 * GBps
)
func (br *Byterate) UnmarshalText(b []byte) error {
comps := unmarshalRegex.FindStringSubmatch(string(b))
if len(comps) != 3 {
return fmt.Errorf("invalid byterate format %s should be n Bps, n KBps, n MBps, n GBps, or n TBps", string(b))
}
v, err := strconv.Atoi(comps[1])
if err != nil {
return err
}
unitStr := comps[2]
var unit Byterate
switch unitStr {
case "Bps":
unit = Bps
case "KBps":
unit = KBps
case "MBps":
unit = MBps
case "GBps":
unit = GBps
case "TBps":
unit = TBps
default:
return fmt.Errorf("unknown unit %s", unitStr)
}
*br = Byterate(v) * unit
return nil
}

View File

@ -0,0 +1,67 @@
package throttle
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestByterateUnmarshalValid(t *testing.T) {
tests := []struct {
input string
expected Byterate
}{
{
input: "1 Bps",
expected: 1 * Bps,
},
{
input: "31 KBps",
expected: 31 * KBps,
},
{
input: "42 MBps",
expected: 42 * MBps,
},
{
input: "120 GBps",
expected: 120 * GBps,
},
{
input: "3TBps",
expected: 3 * TBps,
},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
var br Byterate
err := br.UnmarshalText([]byte(tt.input))
require.NoError(t, err)
require.Equal(t, tt.expected, br)
})
}
}
func TestByterateUnmarshalInvalid(t *testing.T) {
tests := []struct {
input string
}{
{
input: "foobar",
},
{
input: "1 Mbps",
},
{
input: "1.1 MBps",
},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
var br Byterate
err := br.UnmarshalText([]byte(tt.input))
require.EqualError(t, err, fmt.Sprintf("invalid byterate format %s should be n Bps, n KBps, n MBps, n GBps, or n TBps", tt.input))
})
}
}

34
pkg/throttle/writer.go Normal file
View File

@ -0,0 +1,34 @@
package throttle
import (
"fmt"
"io"
"time"
"golang.org/x/time/rate"
)
type writer struct {
limiter *rate.Limiter
writer io.Writer
}
func NewWriter(w io.Writer, limiter *rate.Limiter) io.Writer {
return &writer{
limiter: limiter,
writer: w,
}
}
func (w *writer) Write(p []byte) (int, error) {
n, err := w.writer.Write(p)
if err != nil {
return 0, err
}
r := w.limiter.ReserveN(time.Now(), n)
if !r.OK() {
return n, fmt.Errorf("write size %d exceeds limiters burst %d", n, w.limiter.Burst())
}
time.Sleep(r.Delay())
return n, nil
}

View File

@ -0,0 +1,28 @@
package throttle
import (
"bytes"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
func TestWriter(t *testing.T) {
limit := rate.Limit(500 * Bps)
limiter := rate.NewLimiter(limit, 1024*1024)
limiter.AllowN(time.Now(), 1024*1024)
w := NewWriter(bytes.NewBuffer([]byte{}), limiter)
chunkSize := 100
start := time.Now()
for i := 0; i < 10; i++ {
b := make([]byte, chunkSize)
n, err := w.Write(b)
require.NoError(t, err)
require.Equal(t, chunkSize, n)
}
d := time.Since(start)
require.Greater(t, d, 2*time.Second)
require.Less(t, d, 3*time.Second)
}