Refactor OCI events to support content events

Signed-off-by: Philip Laine <philip.laine@gmail.com>
This commit is contained in:
Philip Laine 2025-05-15 13:19:22 +02:00
parent 6c67bced55
commit 86c6941fbd
No known key found for this signature in database
GPG Key ID: F6D0B743CA3EFF33
7 changed files with 101 additions and 191 deletions

View File

@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#873](https://github.com/spegel-org/spegel/pull/873) Refactor web to use internal mux router.
- [#875](https://github.com/spegel-org/spegel/pull/875) Change debug unit formatting and add totals.
- [#880](https://github.com/spegel-org/spegel/pull/880) Refactor store advertisement to list content.
- [#888](https://github.com/spegel-org/spegel/pull/888) Refactor OCI events to support content events.
### Deprecated

View File

@ -18,6 +18,7 @@ import (
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/labels"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
@ -174,52 +175,42 @@ func verifyStatusResponse(resp *runtimeapi.StatusResponse, configPath string) er
return fmt.Errorf("Containerd registry config path is %s but needs to contain path %s for mirror configuration to take effect", *cfg.Registry.ConfigPath, configPath)
}
func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) {
imgCh := make(chan ImageEvent)
errCh := make(chan error)
func (c *Containerd) Subscribe(ctx context.Context) (<-chan OCIEvent, error) {
client, err := c.Client()
if err != nil {
return nil, nil, err
return nil, err
}
log := logr.FromContextOrDiscard(ctx)
ctx, cancel := context.WithCancel(ctx)
eventCh := make(chan OCIEvent)
envelopeCh, cErrCh := client.EventService().Subscribe(ctx, c.eventFilter)
go func() {
for envelope := range envelopeCh {
var img Image
imageName, eventType, err := getEventImage(envelope.Event)
if err != nil {
errCh <- err
continue
}
switch eventType {
case CreateEvent, UpdateEvent:
cImg, err := client.GetImage(ctx, imageName)
defer close(eventCh)
for {
select {
case <-ctx.Done():
return
case envelope := <-envelopeCh:
events, err := c.convertEvent(ctx, *envelope)
if err != nil {
errCh <- err
log.Error(err, "error when handling event")
continue
}
img, err = ParseImageRequireDigest(cImg.Name(), cImg.Target().Digest)
if err != nil {
errCh <- err
continue
}
case DeleteEvent:
img, err = ParseImageRequireDigest(imageName, "")
if err != nil {
errCh <- err
continue
for _, event := range events {
eventCh <- event
}
}
imgCh <- ImageEvent{Image: img, Type: eventType}
}
close(imgCh)
}()
go func() {
// Required so that the event channel closes in case Containerd is restarted.
defer cancel()
for err := range cErrCh {
errCh <- err
log.Error(err, "containerd event error")
}
close(errCh)
}()
return imgCh, errCh, nil
return eventCh, nil
}
func (c *Containerd) ListImages(ctx context.Context) ([]Image, error) {
@ -341,6 +332,49 @@ func (c *Containerd) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadSe
}, nil
}
func (c *Containerd) convertEvent(ctx context.Context, envelope events.Envelope) ([]OCIEvent, error) {
if envelope.Event == nil {
return nil, errors.New("envelope event cannot be nil")
}
evt, err := typeurl.UnmarshalAny(envelope.Event)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal envelope event: %w", err)
}
switch e := evt.(type) {
case *eventtypes.ImageCreate:
// Containerd creates an image create event for image config which we ignore.
if err := digest.Digest(e.GetName()).Validate(); err == nil {
return nil, nil
}
img, err := ParseImage(e.GetName())
if err != nil {
return nil, err
}
// Pull by tag creates an event only for the tag. We dont get content to avoid advertising twice.
if img.Digest == "" {
return []OCIEvent{{Type: CreateEvent, Key: e.GetName()}}, nil
}
// Walk the image to return events for all of the layers.
dgsts, err := WalkImage(ctx, c, img)
if err != nil {
return nil, fmt.Errorf("could not get digests for image %s: %w", img.String(), err)
}
events := []OCIEvent{}
for _, dgst := range dgsts {
events = append(events, OCIEvent{Type: CreateEvent, Key: dgst.String()})
}
return events, nil
case *eventtypes.ImageDelete:
// Ignore delete event created for image config.
if err := digest.Digest(e.GetName()).Validate(); err == nil {
return nil, nil
}
return []OCIEvent{{Type: DeleteEvent, Key: e.GetName()}}, nil
default:
return nil, errors.New("unsupported event type")
}
}
func parseContentRegistries(l map[string]string) []string {
registries := []string{}
for k := range l {
@ -352,26 +386,6 @@ func parseContentRegistries(l map[string]string) []string {
return registries
}
func getEventImage(e typeurl.Any) (string, EventType, error) {
if e == nil {
return "", "", errors.New("any cannot be nil")
}
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", "", fmt.Errorf("failed to unmarshal any: %w", err)
}
switch e := evt.(type) {
case *eventtypes.ImageCreate:
return e.Name, CreateEvent, nil
case *eventtypes.ImageUpdate:
return e.Name, UpdateEvent, nil
case *eventtypes.ImageDelete:
return e.Name, DeleteEvent, nil
default:
return "", "", errors.New("unsupported event type")
}
}
func createFilters(mirroredRegistries []url.URL) (string, string, string) {
registryHosts := []string{}
for _, registry := range mirroredRegistries {
@ -383,7 +397,7 @@ func createFilters(mirroredRegistries []url.URL) (string, string, string) {
// as we cant mirror images without registries.
imageFilter = `name~="^.+/"`
}
eventFilter := fmt.Sprintf(`topic~="/images/create|/images/update|/images/delete",event.%s`, imageFilter)
eventFilter := fmt.Sprintf(`topic~="/images/create|/images/delete",event.%s`, imageFilter)
contentFilters := []string{}
for _, registry := range mirroredRegistries {
contentFilters = append(contentFilters, fmt.Sprintf(`labels."%s.%s"~="^."`, labels.LabelDistributionSource, registry.Host))

View File

@ -9,8 +9,6 @@ import (
"path/filepath"
"testing"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/typeurl/v2"
"github.com/go-logr/logr"
"github.com/stretchr/testify/require"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -260,14 +258,14 @@ func TestCreateFilter(t *testing.T) {
name: "with registry filtering",
registries: []string{"https://docker.io", "https://gcr.io"},
expectedImageFilter: `name~="^(docker\\.io|gcr\\.io)/"`,
expectedEventFilter: `topic~="/images/create|/images/update|/images/delete",event.name~="^(docker\\.io|gcr\\.io)/"`,
expectedEventFilter: `topic~="/images/create|/images/delete",event.name~="^(docker\\.io|gcr\\.io)/"`,
expectedContentFilter: `labels."containerd.io/distribution.source.docker.io"~="^." labels."containerd.io/distribution.source.gcr.io"~="^."`,
},
{
name: "without registry filtering",
registries: []string{},
expectedImageFilter: `name~="^.+/"`,
expectedEventFilter: `topic~="/images/create|/images/update|/images/delete",event.name~="^.+/"`,
expectedEventFilter: `topic~="/images/create|/images/delete",event.name~="^.+/"`,
expectedContentFilter: "",
},
}
@ -283,74 +281,6 @@ func TestCreateFilter(t *testing.T) {
}
}
func TestGetEventImage(t *testing.T) {
t.Parallel()
tests := []struct {
name string
data any
expectedErr string
expectedName string
expectedEventType EventType
}{
{
name: "type url is nil",
data: nil,
expectedErr: "any cannot be nil",
},
{
name: "unknown event",
data: &eventtypes.ContainerCreate{},
expectedErr: "unsupported event type",
},
{
name: "create event",
data: &eventtypes.ImageCreate{
Name: "create",
},
expectedName: "create",
expectedEventType: CreateEvent,
},
{
name: "update event",
data: &eventtypes.ImageUpdate{
Name: "update",
},
expectedName: "update",
expectedEventType: UpdateEvent,
},
{
name: "delete event",
data: &eventtypes.ImageDelete{
Name: "delete",
},
expectedName: "delete",
expectedEventType: DeleteEvent,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var e typeurl.Any
var err error
if tt.data != nil {
e, err = typeurl.MarshalAny(tt.data)
require.NoError(t, err)
}
name, event, err := getEventImage(e)
if tt.expectedErr != "" {
require.EqualError(t, err, tt.expectedErr)
return
}
require.NoError(t, err)
require.Equal(t, tt.expectedName, name)
require.Equal(t, tt.expectedEventType, event)
})
}
}
func TestMirrorConfiguration(t *testing.T) {
t.Parallel()

View File

@ -36,8 +36,8 @@ func (m *Memory) Verify(ctx context.Context) error {
return nil
}
func (m *Memory) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) {
return nil, nil, nil
func (m *Memory) Subscribe(ctx context.Context) (<-chan OCIEvent, error) {
return nil, nil
}
func (m *Memory) ListImages(ctx context.Context) ([]Image, error) {

View File

@ -21,13 +21,12 @@ type EventType string
const (
CreateEvent EventType = "CREATE"
UpdateEvent EventType = "UPDATE"
DeleteEvent EventType = "DELETE"
)
type ImageEvent struct {
Image Image
Type EventType
type OCIEvent struct {
Type EventType
Key string
}
type Content struct {
@ -43,7 +42,7 @@ type Store interface {
Verify(ctx context.Context) error
// Subscribe will notify for any image events ocuring in the store backend.
Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error)
Subscribe(ctx context.Context) (<-chan OCIEvent, error)
// ListImages returns a list of all local images.
ListImages(ctx context.Context) ([]Image, error)
@ -102,14 +101,14 @@ func DetermineMediaType(b []byte) (string, error) {
return "", errors.New("not able to determine media type")
}
func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) {
keys := []string{}
func WalkImage(ctx context.Context, store Store, img Image) ([]digest.Digest, error) {
dgsts := []digest.Digest{}
err := walk(ctx, []digest.Digest{img.Digest}, func(dgst digest.Digest) ([]digest.Digest, error) {
b, mt, err := store.GetManifest(ctx, dgst)
if err != nil {
return nil, err
}
keys = append(keys, dgst.String())
dgsts = append(dgsts, dgst)
switch mt {
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
var idx ocispec.Index
@ -137,9 +136,9 @@ func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) {
if err != nil {
return nil, err
}
keys = append(keys, manifest.Config.Digest.String())
dgsts = append(dgsts, manifest.Config.Digest)
for _, layer := range manifest.Layers {
keys = append(keys, layer.Digest.String())
dgsts = append(dgsts, layer.Digest)
}
return nil, nil
default:
@ -149,10 +148,10 @@ func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("failed to walk image manifests: %w", err)
}
if len(keys) == 0 {
if len(dgsts) == 0 {
return nil, errors.New("no image digests found")
}
return keys, nil
return dgsts, nil
}
func walk(ctx context.Context, dgsts []digest.Digest, handler func(dgst digest.Digest) ([]digest.Digest, error)) error {

View File

@ -184,14 +184,14 @@ func TestOCIClient(t *testing.T) {
}
identifiersTests := []struct {
imageName string
imageDigest string
expectedKeys []string
imageName string
imageDigest string
expectedDgsts []digest.Digest
}{
{
imageName: "ghcr.io/spegel-org/spegel:v0.0.8-with-media-type",
imageDigest: "sha256:9506c8e7a2d0a098d43cadfd7ecdc3c91697e8188d3a1245943b669f717747b4",
expectedKeys: []string{
expectedDgsts: []digest.Digest{
"sha256:9506c8e7a2d0a098d43cadfd7ecdc3c91697e8188d3a1245943b669f717747b4",
"sha256:44cb2cf712c060f69df7310e99339c1eb51a085446f1bb6d44469acff35b4355",
"sha256:d715ba0d85ee7d37da627d0679652680ed2cb23dde6120f25143a0b8079ee47e",
@ -237,7 +237,7 @@ func TestOCIClient(t *testing.T) {
{
imageName: "ghcr.io/spegel-org/spegel:v0.0.8-without-media-type",
imageDigest: "sha256:d8df04365d06181f037251de953aca85cc16457581a8fc168f4957c978e1008b",
expectedKeys: []string{
expectedDgsts: []digest.Digest{
"sha256:d8df04365d06181f037251de953aca85cc16457581a8fc168f4957c978e1008b",
"sha256:44cb2cf712c060f69df7310e99339c1eb51a085446f1bb6d44469acff35b4355",
"sha256:d715ba0d85ee7d37da627d0679652680ed2cb23dde6120f25143a0b8079ee47e",
@ -287,9 +287,9 @@ func TestOCIClient(t *testing.T) {
img, err := ParseImageRequireDigest(tt.imageName, digest.Digest(tt.imageDigest))
require.NoError(t, err)
keys, err := WalkImage(ctx, ociStore, img)
dgsts, err := WalkImage(ctx, ociStore, img)
require.NoError(t, err)
require.Equal(t, tt.expectedKeys, keys)
require.Equal(t, tt.expectedDgsts, dgsts)
})
}
})

View File

@ -3,7 +3,6 @@ package state
import (
"context"
"errors"
"fmt"
"time"
"github.com/go-logr/logr"
@ -16,7 +15,7 @@ import (
func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resolveLatestTag bool) error {
log := logr.FromContextOrDiscard(ctx)
eventCh, errCh, err := ociStore.Subscribe(ctx)
eventCh, err := ociStore.Subscribe(ctx)
if err != nil {
return err
}
@ -31,7 +30,7 @@ func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resol
case <-ctx.Done():
return nil
case <-tickerCh:
log.Info("running tick state update")
log.Info("running state update")
err := tick(ctx, ociStore, router, resolveLatestTag)
if err != nil {
log.Error(err, "received errors when updating all images")
@ -39,18 +38,14 @@ func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resol
}
case event, ok := <-eventCh:
if !ok {
return errors.New("image event channel closed")
return errors.New("event channel closed")
}
log.Info("received image event", "image", event.Image.String(), "type", event.Type)
if _, err := update(ctx, ociStore, router, event, false, resolveLatestTag); err != nil {
log.Error(err, "received error when updating image")
log.Info("OCI event", "key", event.Key, "type", event.Type)
err := handle(ctx, router, event)
if err != nil {
log.Error(err, "could not handle event")
continue
}
case err, ok := <-errCh:
if !ok {
return errors.New("image error channel closed")
}
log.Error(err, "event channel error")
}
}
}
@ -112,42 +107,13 @@ func tick(ctx context.Context, ociStore oci.Store, router routing.Router, resolv
return nil
}
func update(ctx context.Context, ociStore oci.Store, router routing.Router, event oci.ImageEvent, skipDigests, resolveLatestTag bool) (int, error) {
keys := []string{}
//nolint: staticcheck // Simplify in future.
if !(!resolveLatestTag && event.Image.IsLatestTag()) {
if tagName, ok := event.Image.TagName(); ok {
keys = append(keys, tagName)
}
func handle(ctx context.Context, router routing.Router, event oci.OCIEvent) error {
if event.Type != oci.CreateEvent {
return nil
}
if event.Type == oci.DeleteEvent {
// We don't know how many digest keys were associated with the deleted image;
// that can only be updated by the full image list sync in all().
metrics.AdvertisedImages.WithLabelValues(event.Image.Registry).Sub(1)
// DHT doesn't actually have any way to stop providing a key, you just have to wait for the record to expire
// from the datastore. Record TTL is a datastore-level value, so we can't even re-provide with a shorter TTL.
return 0, nil
}
if !skipDigests {
dgsts, err := oci.WalkImage(ctx, ociStore, event.Image)
if err != nil {
return 0, fmt.Errorf("could not get digests for image %s: %w", event.Image.String(), err)
}
keys = append(keys, dgsts...)
}
err := router.Advertise(ctx, keys)
err := router.Advertise(ctx, []string{event.Key})
if err != nil {
return 0, fmt.Errorf("could not advertise image %s: %w", event.Image.String(), err)
return err
}
if event.Type == oci.CreateEvent {
// We don't know how many unique digest keys will be associated with the new image;
// that can only be updated by the full image list sync in all().
metrics.AdvertisedImages.WithLabelValues(event.Image.Registry).Add(1)
if event.Image.Tag == "" {
metrics.AdvertisedImageDigests.WithLabelValues(event.Image.Registry).Add(1)
} else {
metrics.AdvertisedImageTags.WithLabelValues(event.Image.Registry).Add(1)
}
}
return len(keys), nil
return nil
}