Compare commits

...

1 Commits

Author SHA1 Message Date
Philip Laine
0fd3ba80b1
Add visualization tool to debug image sources 2024-05-26 11:36:10 +02:00
9 changed files with 641 additions and 0 deletions

View File

@ -101,5 +101,6 @@ spec:
| spegel.registries | list | `["https://cgr.dev","https://docker.io","https://ghcr.io","https://quay.io","https://mcr.microsoft.com","https://public.ecr.aws","https://gcr.io","https://registry.k8s.io","https://k8s.gcr.io","https://lscr.io"]` | Registries for which mirror configuration will be created. |
| spegel.resolveLatestTag | bool | `true` | When true latest tags will be resolved to digests. |
| spegel.resolveTags | bool | `true` | When true Spegel will resolve tags to digests. |
| spegel.viualizeEnabled | bool | `false` | When true visualizer will run and record events. |
| tolerations | list | `[{"key":"CriticalAddonsOnly","operator":"Exists"},{"effect":"NoExecute","operator":"Exists"},{"effect":"NoSchedule","operator":"Exists"}]` | Tolerations for pod assignment. |
| updateStrategy | object | `{}` | An update strategy to replace existing pods with new pods. |

View File

@ -78,6 +78,7 @@ spec:
- --registry-addr=:{{ .Values.service.registry.port }}
- --router-addr=:{{ .Values.service.router.port }}
- --metrics-addr=:{{ .Values.service.metrics.port }}
- --visualize-enabled={{ .Values.spegel.visualizeEnabled }}
{{- with .Values.spegel.registries }}
- --registries
{{- range . }}

View File

@ -160,3 +160,5 @@ spegel:
blobSpeed: ""
# -- When true existing mirror configuration will be appended to instead of replaced.
appendMirrors: false
# -- When true visualizer will run and record events.
viualizeEnabled: false

51
main.go
View File

@ -29,6 +29,7 @@ import (
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/state"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)
type ConfigurationCmd struct {
@ -63,9 +64,19 @@ type RegistryCmd struct {
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout,env:MIRROR_RESOLVE_TIMEOUT" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries,env:MIRROR_RESOLVE_RETRIES" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."`
VisualizeEnabled bool `arg:"--visualize-enabled,env:VISUALIZE_ENABLED" default:"false" help:"When true visualizer will run and record events."`
}
type VisualizationCmd struct {
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
ContainerdSock string `arg:"--containerd-sock,env:CONTAINERD_SOCK" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace,env:CONTAINERD_NAMESPACE" default:"k8s.io" help:"Containerd namespace to fetch images from."`
ContainerdContentPath string `arg:"--containerd-content-path,env:CONTAINERD_CONTENT_PATH" default:"/var/lib/containerd/io.containerd.content.v1.content" help:"Path to Containerd content store"`
Registries []url.URL `arg:"--registries,env:REGISTRIES,required" help:"registries that are configured to be mirrored."`
}
type Arguments struct {
Visualization *VisualizationCmd `arg:"subcommand:visualization"`
Configuration *ConfigurationCmd `arg:"subcommand:configuration"`
Registry *RegistryCmd `arg:"subcommand:registry"`
LogLevel slog.Level `arg:"--log-level,env:LOG_LEVEL" default:"INFO" help:"Minimum log level to output. Value should be DEBUG, INFO, WARN, or ERROR."`
@ -96,6 +107,8 @@ func run(ctx context.Context, args *Arguments) error {
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()
switch {
case args.Visualization != nil:
return visualizeCommand(ctx, args.Visualization)
case args.Configuration != nil:
return configurationCommand(ctx, args.Configuration)
case args.Registry != nil:
@ -105,6 +118,38 @@ func run(ctx context.Context, args *Arguments) error {
}
}
func visualizeCommand(ctx context.Context, args *VisualizationCmd) error {
eventStore := visualize.NewMemoryStore()
ociClient, err := oci.NewContainerd(args.ContainerdSock, args.ContainerdNamespace, args.ContainerdRegistryConfigPath, args.Registries, oci.WithContentPath(args.ContainerdContentPath))
if err != nil {
return err
}
imgs, err := ociClient.ListImages(ctx)
if err != nil {
return err
}
for _, img := range imgs {
ids, err := ociClient.AllIdentifiers(ctx, img)
if err != nil {
return err
}
for _, id := range ids {
eventStore.RecordExisting(id, img.Registry)
}
}
mux := http.NewServeMux()
mux.Handle("/visualize/", visualize.Handler(ociClient, eventStore))
srv := &http.Server{
Addr: ":9090",
Handler: mux,
}
err = srv.ListenAndServe()
if err != nil {
return err
}
return nil
}
func configurationCommand(ctx context.Context, args *ConfigurationCmd) error {
fs := afero.NewOsFs()
err := oci.AddMirrorConfiguration(ctx, fs, args.ContainerdRegistryConfigPath, args.Registries, args.MirrorRegistries, args.ResolveTags, args.AppendMirrors)
@ -141,6 +186,11 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
var eventStore visualize.EventStore
if args.VisualizeEnabled {
eventStore = visualize.NewMemoryStore()
mux.Handle("/visualize/", visualize.Handler(ociClient, eventStore))
}
metricsSrv := &http.Server{
Addr: args.MetricsAddr,
Handler: mux,
@ -195,6 +245,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
registry.WithResolveTimeout(args.MirrorResolveTimeout),
registry.WithLocalAddress(args.LocalAddr),
registry.WithLogger(log),
registry.WithEventStore(eventStore),
}
if args.BlobSpeed != nil {
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))

View File

@ -8,6 +8,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/netip"
"net/url"
"path"
"strconv"
@ -21,6 +22,7 @@ import (
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)
const (
@ -37,6 +39,7 @@ type Registry struct {
resolveRetries int
resolveTimeout time.Duration
resolveLatestTag bool
eventStore visualize.EventStore
}
type Option func(*Registry)
@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option {
}
}
func WithEventStore(eventStore visualize.EventStore) Option {
return func(r *Registry) {
r.eventStore = eventStore
}
}
func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry {
r := &Registry{
ociClient: ociClient,
@ -188,6 +197,23 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
return "mirror"
}
defer func() {
if req.Method != http.MethodGet {
return
}
id := ref.name
if id == "" {
// First 7 character plus sha256: prefix
id = ref.dgst.String()[:14]
}
ip := getClientIP(req)
addr, err := netip.ParseAddr(ip)
if err != nil {
return
}
r.eventStore.RecordRequest(id, addr, rw.Status(), false)
}()
// Serve registry endpoints.
switch ref.kind {
case referenceKindManifest:
@ -288,6 +314,19 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
return nil
}
proxy.ServeHTTP(rw, req)
// Track image events if enabled
if r.eventStore != nil {
if req.Method != http.MethodGet {
return
}
id := ref.name
if id == "" {
id = ref.dgst.String()
}
r.eventStore.RecordRequest(id, ipAddr.Addr(), rw.Status(), true)
}
if !succeeded {
break
}

161
pkg/visualize/store.go Normal file
View File

@ -0,0 +1,161 @@
package visualize
import (
"net/http"
"net/netip"
"sync"
)
type GraphData struct {
Nodes []Node `json:"nodes"`
Links []Link `json:"links"`
}
type Node struct {
ID string `json:"id"`
}
type Link struct {
ID string `json:"id"`
Source string `json:"source"`
Target string `json:"target"`
Color string `json:"color"`
}
type EventStore interface {
RecordExisting(id string, registry string)
RecordRequest(id string, peer netip.Addr, status int, mirror bool)
FilterById(include []string) EventStore
FilterByDirection(rootIsSource bool) EventStore
Graph() GraphData
}
// TODO: Include blob or manifest
type edge struct {
node string
id string
status int
rootIsSource bool
}
var _ EventStore = &MemoryStore{}
type MemoryStore struct {
mx sync.RWMutex
edges []edge
edgeIndex map[string]int
}
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
edges: []edge{},
edgeIndex: map[string]int{},
}
}
func (m *MemoryStore) RecordExisting(id string, registry string) {
m.record(id, registry, http.StatusOK, true)
}
func (m *MemoryStore) RecordRequest(id string, peer netip.Addr, status int, mirror bool) {
m.record(id, peer.String(), status, mirror)
}
func (m *MemoryStore) record(id string, node string, status int, rootIsSource bool) {
m.mx.Lock()
defer m.mx.Unlock()
e := edge{
node: node,
id: id,
status: status,
rootIsSource: rootIsSource,
}
if idx, ok := m.edgeIndex[id]; ok {
m.edges[idx] = e
return
}
m.edges = append(m.edges, e)
m.edgeIndex[id] = len(m.edges) - 1
}
func (m *MemoryStore) FilterById(include []string) EventStore {
m.mx.RLock()
defer m.mx.RUnlock()
f := NewMemoryStore()
for _, v := range include {
idx, ok := m.edgeIndex[v]
if !ok {
continue
}
edge := m.edges[idx]
f.edges = append(f.edges, edge)
f.edgeIndex[v] = len(f.edges) - 1
}
return f
}
func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore {
m.mx.RLock()
defer m.mx.RUnlock()
f := NewMemoryStore()
for _, edge := range m.edges {
if edge.rootIsSource != rootIsSource {
continue
}
f.edges = append(f.edges, edge)
f.edgeIndex[edge.id] = len(f.edges) - 1
}
return f
}
func (m *MemoryStore) Graph() GraphData {
m.mx.RLock()
defer m.mx.RUnlock()
gd := GraphData{
Nodes: []Node{
{
ID: "self",
},
},
Links: []Link{},
}
nodeIndex := map[string]interface{}{}
for _, edge := range m.edges {
src := gd.Nodes[0].ID
dest := edge.node
if !edge.rootIsSource {
src = edge.node
dest = gd.Nodes[0].ID
}
color := linkColor(edge.status)
link := Link{
ID: edge.id,
Source: src,
Target: dest,
Color: color,
}
gd.Links = append(gd.Links, link)
if _, ok := nodeIndex[edge.node]; ok {
continue
}
gd.Nodes = append(gd.Nodes, Node{ID: edge.node})
nodeIndex[edge.node] = nil
}
return gd
}
func linkColor(status int) string {
switch status {
case 0:
return "yellow"
case http.StatusOK:
return "green"
default:
return "red"
}
}

136
pkg/visualize/store_test.go Normal file
View File

@ -0,0 +1,136 @@
package visualize
import (
"net/http"
"net/netip"
"testing"
"github.com/stretchr/testify/require"
)
func TestMemoryStore(t *testing.T) {
t.Parallel()
store := NewMemoryStore()
store.RecordRequest("one", netip.MustParseAddr("127.0.0.1"), http.StatusOK, true)
store.RecordRequest("two", netip.MustParseAddr("127.0.0.1"), http.StatusNotFound, true)
store.RecordRequest("three", netip.MustParseAddr("10.0.0.0"), http.StatusOK, false)
tests := []struct {
name string
store EventStore
expectedNodes []Node
expectedLinks []Link
}{
{
name: "no filter",
store: store,
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "127.0.0.1",
},
{
ID: "10.0.0.0",
},
},
expectedLinks: []Link{
{
ID: "one",
Source: "self",
Target: "127.0.0.1",
Color: "green",
},
{
ID: "two",
Source: "self",
Target: "127.0.0.1",
Color: "red",
},
{
ID: "three",
Source: "10.0.0.0",
Target: "self",
Color: "green",
},
},
},
{
name: "only from root",
store: store.FilterByDirection(true),
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "127.0.0.1",
},
},
expectedLinks: []Link{
{
ID: "one",
Source: "self",
Target: "127.0.0.1",
Color: "green",
},
{
ID: "two",
Source: "self",
Target: "127.0.0.1",
Color: "red",
},
},
},
{
name: "only to root",
store: store.FilterByDirection(false),
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "10.0.0.0",
},
},
expectedLinks: []Link{
{
ID: "three",
Source: "10.0.0.0",
Target: "self",
Color: "green",
},
},
},
{
name: "filter links",
store: store.FilterById([]string{"three"}),
expectedNodes: []Node{
{
ID: "self",
},
{
ID: "10.0.0.0",
},
},
expectedLinks: []Link{
{
ID: "three",
Source: "10.0.0.0",
Target: "self",
Color: "green",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gd := tt.store.Graph()
require.ElementsMatch(t, tt.expectedNodes, gd.Nodes)
require.ElementsMatch(t, tt.expectedLinks, gd.Links)
})
}
}

248
pkg/visualize/visualize.go Normal file
View File

@ -0,0 +1,248 @@
package visualize
import (
"encoding/json"
"html/template"
"net/http"
"strconv"
"github.com/spegel-org/spegel/internal/mux"
"github.com/spegel-org/spegel/pkg/oci"
)
// NOTE: image could be discoverd by peeking at the manifest content?
// TODO: When layer is not found it should default to subgraph for original registry
func Handler(ociClient oci.Client, store EventStore) http.Handler {
handler := func(rw mux.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/visualize/":
indexHandler(rw, req)
case "/visualize/images":
imagesHandler(rw, req, ociClient)
case "/visualize/graph":
graphHandler(rw, req, ociClient, store)
default:
rw.WriteHeader(http.StatusNotFound)
}
}
return mux.NewServeMux(handler)
}
func indexHandler(rw mux.ResponseWriter, _ *http.Request) {
index := `
<!DOCTYPE html>
<html lang="en">
<head>
<title>Spegel</title>
<script src="https://unpkg.com/htmx.org@1.9.12"></script>
<script src="https://unpkg.com/force-graph@v1.43.5"></script>
<link href='https://fonts.googleapis.com/css?family=Open Sans' rel='stylesheet'>
<style>
body {
margin: 0;
font-family: 'Open Sans';
font-size: 16px;
}
main {
margin: 0 auto;
max-width: 1060px;
width: 100%;
}
fieldset {
margin-bottom: 10px;
}
.container {
display: flex;
flex-direction: column;
gap: 10px;
}
#graph {
border: 1px solid black;
display: flex;
}
</style>
</head>
<body>
<main>
<div class="container">
<h1>Spegel</h1>
<form hx-get="/visualize/graph" hx-trigger="load, change" hx-swap="none" hx-on::after-request="drawGraph(event)">
<fieldset>
<legend>Request Direction</legend>
<input hx-preserve type="radio" id="incoming" name="direction" value="false" />
<label for="incoming">Incoming</label>
<input hx-preserve type="radio" id="both" name="direction" value="" checked />
<label for="both">Both</label>
<input hx-preserve type="radio" id="outgoing" name="direction" value="true" />
<label for="outgoing">Outgoing</label>
</fieldset>
<fieldset>
<legend>Images</legend>
<div hx-get="/visualize/images" hx-trigger="load, every 1s" hx-swap="innerHTML"></div>
</fieldset>
</form>
<div id="graph"></div>
<script>
const elem = document.getElementById('graph');
const graph = ForceGraph()(elem);
graph.width(1060)
.height(700)
.nodeId('id')
.nodeLabel('id')
.linkLabel('id')
.linkColor('color')
.linkCurvature('curvature')
.linkDirectionalArrowRelPos(1)
.linkDirectionalArrowLength(3);
function drawGraph(event) {
if (event.detail.pathInfo.requestPath != "/visualize/graph") {
return
}
if (event.detail.successful != true) {
return console.error(event);
}
let data = JSON.parse(event.detail.xhr.response)
// Compute the curvature for links sharing the same two nodes to avoid overlaps
let sameNodesLinks = {};
const curvatureMinMax = 0.5;
data.links.forEach(link => {
link.nodePairId = link.source <= link.target ? (link.source + "_" + link.target) : (link.target + "_" + link.source);
let map = link.source === link.target ? selfLoopLinks : sameNodesLinks;
if (!map[link.nodePairId]) {
map[link.nodePairId] = [];
}
map[link.nodePairId].push(link);
});
Object.keys(sameNodesLinks).filter(nodePairId => sameNodesLinks[nodePairId].length > 1).forEach(nodePairId => {
let links = sameNodesLinks[nodePairId];
let lastIndex = links.length - 1;
let lastLink = links[lastIndex];
lastLink.curvature = curvatureMinMax;
let delta = 2 * curvatureMinMax / lastIndex;
for (let i = 0; i < lastIndex; i++) {
links[i].curvature = - curvatureMinMax + i * delta;
if (lastLink.source !== links[i].source) {
links[i].curvature *= -1; // flip it around, otherwise they overlap
}
}
});
graph.graphData(data)
}
</script>
</div>
</main>
</body>
</html>
`
rw.Write([]byte(index))
}
func imagesHandler(rw mux.ResponseWriter, req *http.Request, ociClient oci.Client) {
imgs, err := ociClient.ListImages(req.Context())
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
tmpl, err := template.New("images").Parse(`
<style>
table {
margin-top: 10px;
width: 100%;
}
table, th, td {
border: 1px solid black;
border-collapse: collapse;
}
th, td {
padding: 8px 5px;
}
th {
text-align: left;
}
th:nth-child(3) {
text-align: right;
}
td:nth-child(3) {
text-align: right;
}
</style>
<table>
<thead>
<tr>
<th></th>
<th>Name</th>
<th>Created</th>
</tr>
</thead>
<tbody>
{{ range $i, $element := . }}
<tr>
<td><input type="radio" hx-preserve id="{{ $i }}" name="image" value="{{ $element }}" {{ if eq $i 0 }}checked{{ end }} /></td>
<td>{{ $element }}</td>
<td>Today</td>
</tr>
{{ end }}
</tbody>
</table>
`)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
err = tmpl.Execute(rw, imgs)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
}
func graphHandler(rw mux.ResponseWriter, req *http.Request, ociClient oci.Client, store EventStore) {
imgs, err := ociClient.ListImages(req.Context())
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
// Filter based on selected image
imageFilter := req.URL.Query().Get("image")
// TODO: Optimize with name lookup
include := []string{}
for _, img := range imgs {
if img.String() != imageFilter {
continue
}
ids, err := ociClient.AllIdentifiers(req.Context(), img)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
return
}
include = ids
break
}
store = store.FilterById(include)
// Filter based on direction
directionFilter := req.URL.Query().Get("direction")
if directionFilter != "" {
isRootSource, err := strconv.ParseBool(directionFilter)
if err != nil {
rw.WriteError(http.StatusBadRequest, err)
return
}
store = store.FilterByDirection(isRootSource)
}
gd := store.Graph()
b, err := json.Marshal(&gd)
if err != nil {
rw.WriteError(http.StatusInternalServerError, err)
}
rw.Write(b)
}

View File

@ -155,6 +155,8 @@ kubectl --kubeconfig $KIND_KUBECONFIG --namespace nginx wait --timeout=90s deplo
kubectl --kubeconfig $KIND_KUBECONFIG --namespace nginx wait --timeout=90s deployment/nginx-tag-and-digest --for condition=available
kubectl --kubeconfig $KIND_KUBECONFIG --namespace nginx wait --timeout=90s -l app=nginx-not-present --for jsonpath='{.status.containerStatuses[*].state.waiting.reason}'=ImagePullBackOff pod
exit 0
# Verify that Spegel has never restarted
RESTART_COUNT=$(kubectl --kubeconfig $KIND_KUBECONFIG --namespace spegel get pods -o=jsonpath='{.items[*].status.containerStatuses[0].restartCount}')
if [[ $RESTART_COUNT != "0 0 0 0" ]]