Compare commits
2 Commits
main
...
single-lim
Author | SHA1 | Date | |
---|---|---|---|
|
bb68b64aa1 | ||
|
a5c16a73d2 |
@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- [#335](https://github.com/XenitAB/spegel/pull/335) Add k3s to compatibility guide.
|
||||
- [#339](https://github.com/XenitAB/spegel/pull/339) Extend OCI client tests.
|
||||
- [#339](https://github.com/XenitAB/spegel/pull/339) Extend OCI client tests.
|
||||
- [#365](https://github.com/XenitAB/spegel/pull/365) Add support for throttling blob write speed.
|
||||
|
||||
### Changed
|
||||
|
||||
|
@ -83,6 +83,7 @@ spec:
|
||||
| serviceMonitor.labels | object | `{}` | Service monitor specific labels for prometheus to discover servicemonitor. |
|
||||
| serviceMonitor.scrapeTimeout | string | `"30s"` | Prometheus scrape interval timeout. |
|
||||
| spegel.additionalMirrorRegistries | list | `[]` | Additional target mirror registries other than Spegel. |
|
||||
| spegel.blobSpeed | string | `""` | Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps. |
|
||||
| spegel.containerdMirrorAdd | bool | `true` | If true Spegel will add mirror configuration to the node. |
|
||||
| spegel.containerdNamespace | string | `"k8s.io"` | Containerd namespace where images are stored. |
|
||||
| spegel.containerdRegistryConfigPath | string | `"/etc/containerd/certs.d"` | Path to Containerd mirror configuration. |
|
||||
|
@ -92,6 +92,9 @@ spec:
|
||||
- --leader-election-name={{ .Release.Name }}-leader-election
|
||||
- --resolve-latest-tag={{ .Values.spegel.resolveLatestTag }}
|
||||
- --local-addr=$(NODE_IP):{{ .Values.service.registry.hostPort }}
|
||||
{{- with .Values.spegel.blobSpeed }}
|
||||
- --blob-speed={{ . }}
|
||||
{{- end }}
|
||||
env:
|
||||
- name: NODE_IP
|
||||
valueFrom:
|
||||
|
@ -139,3 +139,5 @@ spegel:
|
||||
resolveTags: true
|
||||
# -- When true latest tags will be resolved to digests.
|
||||
resolveLatestTag: true
|
||||
# -- Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps.
|
||||
blobSpeed: ""
|
||||
|
2
go.mod
2
go.mod
@ -28,6 +28,7 @@ require (
|
||||
go.etcd.io/bbolt v1.3.7
|
||||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/sync v0.6.0
|
||||
golang.org/x/time v0.5.0
|
||||
k8s.io/client-go v0.27.4
|
||||
k8s.io/cri-api v0.27.4
|
||||
k8s.io/klog/v2 v2.90.1
|
||||
@ -197,7 +198,6 @@ require (
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
golang.org/x/term v0.16.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
|
||||
gonum.org/v1/gonum v0.13.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
|
27
main.go
27
main.go
@ -27,6 +27,7 @@ import (
|
||||
"github.com/xenitab/spegel/pkg/registry"
|
||||
"github.com/xenitab/spegel/pkg/routing"
|
||||
"github.com/xenitab/spegel/pkg/state"
|
||||
"github.com/xenitab/spegel/pkg/throttle"
|
||||
)
|
||||
|
||||
type ConfigurationCmd struct {
|
||||
@ -47,17 +48,18 @@ type BootstrapConfig struct {
|
||||
|
||||
type RegistryCmd struct {
|
||||
BootstrapConfig
|
||||
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
|
||||
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
|
||||
LocalAddr string `arg:"--local-addr,required" help:"Address that the local Spegel instance will be reached at."`
|
||||
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
|
||||
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
|
||||
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
|
||||
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
|
||||
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
|
||||
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout" default:"5s" help:"Max duration spent finding a mirror."`
|
||||
MirrorResolveRetries int `arg:"--mirror-resolve-retries" default:"3" help:"Max amount of mirrors to attempt."`
|
||||
ResolveLatestTag bool `arg:"--resolve-latest-tag" default:"true" help:"When true latest tags will be resolved to digests."`
|
||||
BlobSpeed *throttle.Byterate `arg:"--blob-speed" help:"Maximum write speed per request when serving blob layers. Should be an integer followed by unit Bps, KBps, MBps, GBps, or TBps."`
|
||||
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
|
||||
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
|
||||
LocalAddr string `arg:"--local-addr,required" help:"Address that the local Spegel instance will be reached at."`
|
||||
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
|
||||
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
|
||||
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
|
||||
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
|
||||
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
|
||||
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout" default:"5s" help:"Max duration spent finding a mirror."`
|
||||
MirrorResolveRetries int `arg:"--mirror-resolve-retries" default:"3" help:"Max amount of mirrors to attempt."`
|
||||
ResolveLatestTag bool `arg:"--resolve-latest-tag" default:"true" help:"When true latest tags will be resolved to digests."`
|
||||
}
|
||||
|
||||
type Arguments struct {
|
||||
@ -176,6 +178,9 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
|
||||
registry.WithResolveTimeout(args.MirrorResolveTimeout),
|
||||
registry.WithLocalAddress(args.LocalAddr),
|
||||
}
|
||||
if args.BlobSpeed != nil {
|
||||
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))
|
||||
}
|
||||
reg := registry.NewRegistry(ociClient, router, registryOpts...)
|
||||
regSrv := reg.Server(args.RegistryAddr, log)
|
||||
g.Go(func() error {
|
||||
|
@ -3,6 +3,7 @@ package registry
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
@ -16,14 +17,17 @@ import (
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/opencontainers/go-digest"
|
||||
pkggin "github.com/xenitab/pkg/gin"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/xenitab/spegel/pkg/metrics"
|
||||
"github.com/xenitab/spegel/pkg/oci"
|
||||
"github.com/xenitab/spegel/pkg/routing"
|
||||
"github.com/xenitab/spegel/pkg/throttle"
|
||||
)
|
||||
|
||||
const (
|
||||
MirroredHeaderKey = "X-Spegel-Mirrored"
|
||||
burstLimit = 1024 * 1024 * 1024 // 1GB
|
||||
)
|
||||
|
||||
type Option func(*Registry)
|
||||
@ -58,7 +62,16 @@ func WithLocalAddress(localAddr string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobSpeed(blobSpeed throttle.Byterate) Option {
|
||||
return func(r *Registry) {
|
||||
limiter := rate.NewLimiter(rate.Limit(blobSpeed), burstLimit)
|
||||
limiter.AllowN(time.Now(), burstLimit)
|
||||
r.rateLimiter = limiter
|
||||
}
|
||||
}
|
||||
|
||||
type Registry struct {
|
||||
rateLimiter *rate.Limiter
|
||||
ociClient oci.Client
|
||||
router routing.Router
|
||||
transport http.RoundTripper
|
||||
@ -294,7 +307,11 @@ func (r *Registry) handleBlob(c *gin.Context, dgst digest.Digest) {
|
||||
if c.Request.Method == http.MethodHead {
|
||||
return
|
||||
}
|
||||
err = r.ociClient.CopyLayer(c, dgst, c.Writer)
|
||||
var writer io.Writer = c.Writer
|
||||
if r.rateLimiter != nil {
|
||||
writer = throttle.NewWriter(c.Writer, r.rateLimiter)
|
||||
}
|
||||
err = r.ociClient.CopyLayer(c, dgst, writer)
|
||||
if err != nil {
|
||||
//nolint:errcheck // ignore
|
||||
c.AbortWithError(http.StatusInternalServerError, err)
|
||||
|
48
pkg/throttle/byterate.go
Normal file
48
pkg/throttle/byterate.go
Normal file
@ -0,0 +1,48 @@
|
||||
package throttle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var unmarshalRegex = regexp.MustCompile(`^(\d+)\s?([KMGT]?Bps)$`)
|
||||
|
||||
type Byterate int
|
||||
|
||||
const (
|
||||
Bps Byterate = 1
|
||||
KBps = 1024 * Bps
|
||||
MBps = 1024 * KBps
|
||||
GBps = 1024 * MBps
|
||||
TBps = 1024 * GBps
|
||||
)
|
||||
|
||||
func (br *Byterate) UnmarshalText(b []byte) error {
|
||||
comps := unmarshalRegex.FindStringSubmatch(string(b))
|
||||
if len(comps) != 3 {
|
||||
return fmt.Errorf("invalid byterate format %s should be n Bps, n KBps, n MBps, n GBps, or n TBps", string(b))
|
||||
}
|
||||
v, err := strconv.Atoi(comps[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unitStr := comps[2]
|
||||
var unit Byterate
|
||||
switch unitStr {
|
||||
case "Bps":
|
||||
unit = Bps
|
||||
case "KBps":
|
||||
unit = KBps
|
||||
case "MBps":
|
||||
unit = MBps
|
||||
case "GBps":
|
||||
unit = GBps
|
||||
case "TBps":
|
||||
unit = TBps
|
||||
default:
|
||||
return fmt.Errorf("unknown unit %s", unitStr)
|
||||
}
|
||||
*br = Byterate(v) * unit
|
||||
return nil
|
||||
}
|
67
pkg/throttle/byterate_test.go
Normal file
67
pkg/throttle/byterate_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
package throttle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestByterateUnmarshalValid(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected Byterate
|
||||
}{
|
||||
{
|
||||
input: "1 Bps",
|
||||
expected: 1 * Bps,
|
||||
},
|
||||
{
|
||||
input: "31 KBps",
|
||||
expected: 31 * KBps,
|
||||
},
|
||||
{
|
||||
input: "42 MBps",
|
||||
expected: 42 * MBps,
|
||||
},
|
||||
{
|
||||
input: "120 GBps",
|
||||
expected: 120 * GBps,
|
||||
},
|
||||
{
|
||||
input: "3TBps",
|
||||
expected: 3 * TBps,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
var br Byterate
|
||||
err := br.UnmarshalText([]byte(tt.input))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expected, br)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestByterateUnmarshalInvalid(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
}{
|
||||
{
|
||||
input: "foobar",
|
||||
},
|
||||
{
|
||||
input: "1 Mbps",
|
||||
},
|
||||
{
|
||||
input: "1.1 MBps",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
var br Byterate
|
||||
err := br.UnmarshalText([]byte(tt.input))
|
||||
require.EqualError(t, err, fmt.Sprintf("invalid byterate format %s should be n Bps, n KBps, n MBps, n GBps, or n TBps", tt.input))
|
||||
})
|
||||
}
|
||||
}
|
34
pkg/throttle/writer.go
Normal file
34
pkg/throttle/writer.go
Normal file
@ -0,0 +1,34 @@
|
||||
package throttle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type writer struct {
|
||||
limiter *rate.Limiter
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
func NewWriter(w io.Writer, limiter *rate.Limiter) io.Writer {
|
||||
return &writer{
|
||||
limiter: limiter,
|
||||
writer: w,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Write(p []byte) (int, error) {
|
||||
n, err := w.writer.Write(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r := w.limiter.ReserveN(time.Now(), n)
|
||||
if !r.OK() {
|
||||
return n, fmt.Errorf("write size %d exceeds limiters burst %d", n, w.limiter.Burst())
|
||||
}
|
||||
time.Sleep(r.Delay())
|
||||
return n, nil
|
||||
}
|
28
pkg/throttle/writer_test.go
Normal file
28
pkg/throttle/writer_test.go
Normal file
@ -0,0 +1,28 @@
|
||||
package throttle
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func TestWriter(t *testing.T) {
|
||||
limit := rate.Limit(500 * Bps)
|
||||
limiter := rate.NewLimiter(limit, 1024*1024)
|
||||
limiter.AllowN(time.Now(), 1024*1024)
|
||||
w := NewWriter(bytes.NewBuffer([]byte{}), limiter)
|
||||
chunkSize := 100
|
||||
start := time.Now()
|
||||
for i := 0; i < 10; i++ {
|
||||
b := make([]byte, chunkSize)
|
||||
n, err := w.Write(b)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, chunkSize, n)
|
||||
}
|
||||
d := time.Since(start)
|
||||
require.Greater(t, d, 2*time.Second)
|
||||
require.Less(t, d, 3*time.Second)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user