Compare commits

...

1 Commits

Author SHA1 Message Date
Philip Laine
8cb0c12921
Add visualization tool to debug image sources 2024-06-09 11:02:38 +02:00
12 changed files with 533 additions and 21 deletions

View File

@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#498](https://github.com/spegel-org/spegel/pull/498) Update to Go 1.22.
- [#499](https://github.com/spegel-org/spegel/pull/499) Add paralleltest linter and set all unit tests to run in parallel.
- [#501](https://github.com/spegel-org/spegel/pull/501) Rename mock router to memory router and add tests.
- [#494](https://github.com/spegel-org/spegel/pull/494) Add visualization tool to debug image sources.
### Deprecated

View File

@ -101,5 +101,6 @@ spec:
| spegel.registries | list | `["https://cgr.dev","https://docker.io","https://ghcr.io","https://quay.io","https://mcr.microsoft.com","https://public.ecr.aws","https://gcr.io","https://registry.k8s.io","https://k8s.gcr.io","https://lscr.io"]` | Registries for which mirror configuration will be created. |
| spegel.resolveLatestTag | bool | `true` | When true latest tags will be resolved to digests. |
| spegel.resolveTags | bool | `true` | When true Spegel will resolve tags to digests. |
| spegel.visualize.enabled | bool | `false` | When true registry requests will be recorded and UI will be served. |
| tolerations | list | `[{"key":"CriticalAddonsOnly","operator":"Exists"},{"effect":"NoExecute","operator":"Exists"},{"effect":"NoSchedule","operator":"Exists"}]` | Tolerations for pod assignment. |
| updateStrategy | object | `{}` | An update strategy to replace existing pods with new pods. |

View File

@ -78,6 +78,7 @@ spec:
- --registry-addr=:{{ .Values.service.registry.port }}
- --router-addr=:{{ .Values.service.router.port }}
- --metrics-addr=:{{ .Values.service.metrics.port }}
- --visualize-enabled={{ .Values.spegel.visualize.enabled }}
{{- with .Values.spegel.registries }}
- --registries
{{- range . }}

View File

@ -160,3 +160,6 @@ spegel:
blobSpeed: ""
# -- When true existing mirror configuration will be appended to instead of replaced.
appendMirrors: false
visualize:
# -- When true registry requests will be recorded and UI will be served.
enabled: false

View File

@ -13,20 +13,23 @@ Read the [benchmark documentation](./BENCHMARK.md) for information of expected g
## How do I know that Spegel is working?
Spegel is meant to be a painless experience to install, meaning that it may be difficult initially to know if things are working or not. Simply put a good indicator that things are working is if all Spegel pods have started and are in a ready state.
Spegel does a couple of checks on startup to verify that any required configuration is correct, if it is not it will exit with an error. While it runs it will log all received requests, both those it mirrors and it serves.
Spegel is meant to be a painless experience to install. Image pulls will fallback to the original registry if Spegel does not work, meaning that it can be difficult to determine if things are working or not. Spegel has a UI that visualizes incoming and outgoing requests, this will allow you understand if images are pulled from other Spegel instances or not.
An incoming request to Spegel that is mirrored will receive the following log.
The UI is disabled by default as it adds additional overhead. To access the UI enable the feature in the Helm values.
```
{"level":"info","ts":1692304805.9038486,"caller":"gin@v0.0.9/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.005075836,"ip":"172.18.0.5","handler":"mirror"}
```yaml
spegel:
visualize:
enabled: true
```
While the Spegel instance on the other end will log.
After all Spegel instances have restarted you can port forward to one of the Spegel pods.
```shell
kubectl -n spegel port-forward ${POD_NAME} 9090
```
{"level":"info","ts":1692304805.9035861,"caller":"gin@v0.0.9/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.003644997,"ip":"172.18.0.5","handler":"blob"}
```
Open the UI at `http://localhost:9090/visualize` in a browser. If all is configured propery you should be presented with and interface showing registry requests.
## Will image pulls break or be delayed if a spegel instance fails or is removed?

View File

@ -29,6 +29,7 @@ import (
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/state"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)
type ConfigurationCmd struct {
@ -63,6 +64,7 @@ type RegistryCmd struct {
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout,env:MIRROR_RESOLVE_TIMEOUT" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries,env:MIRROR_RESOLVE_RETRIES" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."`
VisualizeEnabled bool `arg:"--visualize-enabled,env:VISUALIZE_ENABLED" default:"false" help:"When true visualizer will run and record events."`
}
type Arguments struct {
@ -141,6 +143,11 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
var eventStore visualize.EventStore
if args.VisualizeEnabled {
eventStore = visualize.NewMemoryStore()
mux.Handle("/visualize/", visualize.Handler(eventStore))
}
metricsSrv := &http.Server{
Addr: args.MetricsAddr,
Handler: mux,
@ -195,6 +202,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
registry.WithResolveTimeout(args.MirrorResolveTimeout),
registry.WithLocalAddress(args.LocalAddr),
registry.WithLogger(log),
registry.WithEventStore(eventStore),
}
if args.BlobSpeed != nil {
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))

View File

@ -31,6 +31,13 @@ func (r reference) hasLatestTag() bool {
return tag == "latest"
}
func (r reference) tagOrDigest() string {
if r.name != "" {
return r.name
}
return r.dgst.String()
}
// Package is used to parse components from requests which comform with the OCI distribution spec.
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md
// /v2/<name>/manifests/<reference>

View File

@ -8,6 +8,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/netip"
"net/url"
"path"
"strconv"
@ -21,6 +22,7 @@ import (
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)
const (
@ -28,6 +30,7 @@ const (
)
type Registry struct {
eventStore visualize.EventStore
log logr.Logger
throttler *throttle.Throttler
ociClient oci.Client
@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option {
}
}
func WithEventStore(eventStore visualize.EventStore) Option {
return func(r *Registry) {
r.eventStore = eventStore
}
}
func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry {
r := &Registry{
ociClient: ociClient,
@ -188,6 +197,17 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
return "mirror"
}
if r.eventStore != nil {
defer func() {
ip := getClientIP(req)
addr, err := netip.ParseAddr(ip)
if err != nil {
return
}
r.eventStore.RecordRequest(ref.tagOrDigest(), addr, req.Method, rw.Status(), false)
}()
}
// Serve registry endpoints.
switch ref.kind {
case referenceKindManifest:
@ -203,12 +223,7 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
}
func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref reference) {
key := ref.dgst.String()
if key == "" {
key = ref.name
}
log := r.log.WithValues("key", key, "path", req.URL.Path, "ip", getClientIP(req))
log := r.log.WithValues("key", ref.tagOrDigest(), "path", req.URL.Path, "ip", getClientIP(req))
isExternal := r.isExternalRequest(req)
if isExternal {
@ -237,7 +252,7 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
resolveCtx, cancel := context.WithTimeout(req.Context(), r.resolveTimeout)
defer cancel()
resolveCtx = logr.NewContext(resolveCtx, log)
peerCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries)
peerCh, err := r.router.Resolve(resolveCtx, ref.tagOrDigest(), isExternal, r.resolveRetries)
if err != nil {
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err))
return
@ -248,12 +263,18 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
select {
case <-req.Context().Done():
// Request has been closed by server or client. No use continuing.
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", key, resolveCtx.Err()))
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", ref.tagOrDigest(), resolveCtx.Err()))
return
case ipAddr, ok := <-peerCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
err = fmt.Errorf("mirror with image component %s could not be found", key)
// Register not found if no mirror attempts have been made.
fmt.Println("mirror channel closed", ref.tagOrDigest(), mirrorAttempts)
if r.eventStore != nil && mirrorAttempts == 0 {
r.eventStore.RecordNoMirrors(ref.tagOrDigest())
}
err = fmt.Errorf("mirror with image component %s could not be found", ref.tagOrDigest())
if mirrorAttempts > 0 {
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
}
@ -288,6 +309,12 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
return nil
}
proxy.ServeHTTP(rw, req)
// Track image events if enabled
if r.eventStore != nil {
r.eventStore.RecordRequest(ref.tagOrDigest(), ipAddr.Addr(), req.Method, rw.Status(), true)
}
if !succeeded {
break
}

150
pkg/visualize/store.go Normal file
View File

@ -0,0 +1,150 @@
package visualize
import (
"net/http"
"net/netip"
"sync"
"time"
)
type GraphData struct {
Nodes []Node `json:"nodes"`
Links []Link `json:"links"`
}
type Node struct {
ID string `json:"id"`
}
type Link struct {
ID string `json:"id"`
Source string `json:"source"`
Target string `json:"target"`
Status int `json:"status"`
}
type EventStore interface {
RecordNoMirrors(id string)
RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool)
FilterByDirection(rootIsSource bool) EventStore
Graph() GraphData
LastModified() time.Time
}
type edge struct {
node string
id string
status int
rootIsSource bool
}
var _ EventStore = &MemoryStore{}
type MemoryStore struct {
lastModified time.Time
edgeIndex map[string]int
edges []edge
mx sync.RWMutex
}
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
edges: []edge{},
edgeIndex: map[string]int{},
}
}
func (m *MemoryStore) set(e edge) {
m.mx.Lock()
defer m.mx.Unlock()
m.lastModified = time.Now()
if idx, ok := m.edgeIndex[e.id]; ok {
m.edges[idx] = e
return
}
m.edges = append(m.edges, e)
m.edgeIndex[e.id] = len(m.edges) - 1
}
func (m *MemoryStore) RecordNoMirrors(id string) {
e := edge{
node: "Not Found",
id: id,
rootIsSource: true,
}
m.set(e)
}
func (m *MemoryStore) RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool) {
if method != http.MethodGet {
return
}
e := edge{
node: peer.String(),
id: id,
status: status,
rootIsSource: mirror,
}
m.set(e)
}
func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore { //nolint: ireturn // Have to return interface to implement interface.
m.mx.RLock()
defer m.mx.RUnlock()
f := NewMemoryStore()
f.lastModified = m.lastModified
for _, edge := range m.edges {
if edge.rootIsSource != rootIsSource {
continue
}
f.edges = append(f.edges, edge)
f.edgeIndex[edge.id] = len(f.edges) - 1
}
return f
}
func (m *MemoryStore) Graph() GraphData {
m.mx.RLock()
defer m.mx.RUnlock()
gd := GraphData{
Nodes: []Node{
{
ID: "self",
},
},
Links: []Link{},
}
nodeIndex := map[string]interface{}{}
for _, edge := range m.edges {
src := gd.Nodes[0].ID
dest := edge.node
if !edge.rootIsSource {
src = edge.node
dest = gd.Nodes[0].ID
}
link := Link{
ID: edge.id,
Source: src,
Target: dest,
Status: edge.status,
}
gd.Links = append(gd.Links, link)
if _, ok := nodeIndex[edge.node]; ok {
continue
}
gd.Nodes = append(gd.Nodes, Node{ID: edge.node})
nodeIndex[edge.node] = nil
}
return gd
}
func (m *MemoryStore) LastModified() time.Time {
m.mx.RLock()
defer m.mx.RUnlock()
return m.lastModified
}

116
pkg/visualize/store_test.go Normal file
View File

@ -0,0 +1,116 @@
package visualize
import (
"net/http"
"net/netip"
"testing"
"github.com/stretchr/testify/require"
)
func TestMemoryStore(t *testing.T) {
t.Parallel()
store := NewMemoryStore()
store.RecordRequest("one", netip.MustParseAddr("127.0.0.1"), http.MethodGet, http.StatusOK, true)
store.RecordRequest("two", netip.MustParseAddr("127.0.0.1"), http.MethodGet, http.StatusNotFound, true)
store.RecordRequest("three", netip.MustParseAddr("10.0.0.0"), http.MethodGet, http.StatusOK, false)
tests := []struct {
name string
store EventStore
expectedNodes []Node
expectedLinks []Link
}{
{
name: "no filter",
store: store,
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "127.0.0.1",
},
{
ID: "10.0.0.0",
},
},
expectedLinks: []Link{
{
ID: "one",
Source: "self",
Target: "127.0.0.1",
Status: http.StatusOK,
},
{
ID: "two",
Source: "self",
Target: "127.0.0.1",
Status: http.StatusNotFound,
},
{
ID: "three",
Source: "10.0.0.0",
Target: "self",
Status: http.StatusOK,
},
},
},
{
name: "only from root",
store: store.FilterByDirection(true),
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "127.0.0.1",
},
},
expectedLinks: []Link{
{
ID: "one",
Source: "self",
Target: "127.0.0.1",
Status: http.StatusOK,
},
{
ID: "two",
Source: "self",
Target: "127.0.0.1",
Status: http.StatusNotFound,
},
},
},
{
name: "only to root",
store: store.FilterByDirection(false),
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "10.0.0.0",
},
},
expectedLinks: []Link{
{
ID: "three",
Source: "10.0.0.0",
Target: "self",
Status: http.StatusOK,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gd := tt.store.Graph()
require.ElementsMatch(t, tt.expectedNodes, gd.Nodes)
require.ElementsMatch(t, tt.expectedLinks, gd.Links)
})
}
}

195
pkg/visualize/visualize.go Normal file
View File

@ -0,0 +1,195 @@
package visualize
import (
"encoding/hex"
"encoding/json"
"hash/fnv"
"net/http"
"strconv"
"github.com/spegel-org/spegel/internal/mux"
)
func Handler(store EventStore) http.Handler {
handler := func(rw mux.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/visualize/":
indexHandler(rw, req)
case "/visualize/graph":
graphHandler(rw, req, store)
default:
rw.WriteHeader(http.StatusNotFound)
}
}
return mux.NewServeMux(handler)
}
func indexHandler(rw mux.ResponseWriter, _ *http.Request) {
index := `
<!DOCTYPE html>
<html lang="en">
<head>
<title>Spegel</title>
<script src="https://unpkg.com/htmx.org@1.9.12"></script>
<script src="https://unpkg.com/force-graph@v1.43.5"></script>
<link rel="icon" href="data:,">
<link href='https://fonts.googleapis.com/css?family=Open Sans' rel='stylesheet'>
<style>
body {
margin: 0;
font-family: 'Open Sans';
font-size: 16px;
}
main {
margin: 0 auto;
max-width: 1060px;
width: 100%;
}
fieldset {
margin-bottom: 10px;
}
.container {
display: flex;
flex-direction: column;
gap: 10px;
}
#graph {
margin: 0 2px;
height: 700px;
border-width: 2px;
border-style: groove;
border-color: rgb(192, 192, 192);
border-image: initial;
}
</style>
</head>
<body>
<main>
<div class="container">
<h1>Spegel</h1>
<form hx-get="/visualize/graph" hx-trigger="load, change, every 2s" hx-swap="none" hx-on::after-request="drawGraph(event)">
<fieldset>
<legend>Request Direction</legend>
<input hx-preserve type="radio" id="incoming" name="direction" value="false" />
<label for="incoming">Incoming</label>
<input hx-preserve type="radio" id="both" name="direction" value="" checked />
<label for="both">Both</label>
<input hx-preserve type="radio" id="outgoing" name="direction" value="true" />
<label for="outgoing">Outgoing</label>
</fieldset>
</form>
<div id="graph"></div>
<script>
const elem = document.getElementById('graph');
const graph = ForceGraph()(elem);
graph.nodeId('id')
.width(elem.clientWidth)
.height(elem.clientHeight)
.nodeLabel('id')
.nodeColor((n) => {
if (n.id == "self") {
return 'rgba(166, 166, 168, 1)'
}
return 'rgba(0, 109, 170, 1)'
})
.linkLabel('id')
.linkColor((n) => {
switch (n.status) {
case 0:
return "yellow";
case 200:
return "green";
default:
return "red";
}
})
.linkCurvature('curvature')
.linkDirectionalArrowRelPos(1)
.linkDirectionalArrowLength(2);
var etag = ""
function drawGraph(event) {
if (event.detail.pathInfo.requestPath != "/visualize/graph") {
return
}
if (event.detail.successful != true) {
return console.error(event);
}
let newEtag = event.detail.xhr.getResponseHeader("etag")
if (etag == newEtag) {
return
}
etag = newEtag
let data = JSON.parse(event.detail.xhr.response)
// Compute the curvature for links sharing the same two nodes to avoid overlaps
let sameNodesLinks = {};
const curvatureMinMax = 0.5;
data.links.forEach(link => {
link.nodePairId = link.source <= link.target ? (link.source + "_" + link.target) : (link.target + "_" + link.source);
let map = link.source === link.target ? selfLoopLinks : sameNodesLinks;
if (!map[link.nodePairId]) {
map[link.nodePairId] = [];
}
map[link.nodePairId].push(link);
});
Object.keys(sameNodesLinks).filter(nodePairId => sameNodesLinks[nodePairId].length > 1).forEach(nodePairId => {
let links = sameNodesLinks[nodePairId];
let lastIndex = links.length - 1;
let lastLink = links[lastIndex];
lastLink.curvature = curvatureMinMax;
let delta = 2 * curvatureMinMax / lastIndex;
for (let i = 0; i < lastIndex; i++) {
links[i].curvature = - curvatureMinMax + i * delta;
if (lastLink.source !== links[i].source) {
links[i].curvature *= -1; // flip it around, otherwise they overlap
}
}
});
graph.graphData(data)
}
</script>
</div>
</main>
</body>
</html>
`
//nolint: errcheck // Ignore error.
rw.Write([]byte(index))
}
func graphHandler(rw mux.ResponseWriter, req *http.Request, store EventStore) {
directionFilter := req.URL.Query().Get("direction")
if directionFilter != "" {
isRootSource, err := strconv.ParseBool(directionFilter)
if err != nil {
rw.WriteError(http.StatusBadRequest, err)
return
}
store = store.FilterByDirection(isRootSource)
}
eTagValue := directionFilter + "-" + store.LastModified().String()
hash := fnv.New32a()
_, err := hash.Write([]byte(eTagValue))
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
eTag := hex.EncodeToString(hash.Sum(nil))
if eTag == req.Header.Get("If-None-Match") {
rw.WriteHeader(http.StatusNotModified)
return
}
rw.Header().Set("etag", eTag)
gd := store.Graph()
b, err := json.Marshal(&gd)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
//nolint: errcheck // Ignore error.
rw.Write(b)
}

View File

@ -44,10 +44,10 @@ else
for NODE in control-plane worker2 worker3 worker4
do
NAME=$KIND_NAME-$NODE
docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx:1.21.0@sha256:2f1cd90e00fe2c991e18272bb35d6a8258eeb27785d121aa4cc1ae4235167cfd
docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx:1.23.0
docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx@sha256:b3a676a9145dc005062d5e79b92d90574fb3bf2396f4913dc1732f9065f55c4b
docker exec $NAME ctr -n k8s.io image rm mcr.microsoft.com/containernetworking/azure-cns@sha256:7944413c630746a35d5596f56093706e8d6a3db0569bec0c8e58323f965f7416
docker exec $NAME crictl rmi docker.io/library/nginx:1.21.0@sha256:2f1cd90e00fe2c991e18272bb35d6a8258eeb27785d121aa4cc1ae4235167cfd || true
docker exec $NAME crictl rmi docker.io/library/nginx:1.23.0 || true
docker exec $NAME crictl rmi docker.io/library/nginx@sha256:b3a676a9145dc005062d5e79b92d90574fb3bf2396f4913dc1732f9065f55c4b || true
docker exec $NAME crictl rmi mcr.microsoft.com/containernetworking/azure-cns@sha256:7944413c630746a35d5596f56093706e8d6a3db0569bec0c8e58323f965f7416 || true
done
# Delete Spegel from all nodes