diff --git a/CHANGELOG.md b/CHANGELOG.md index 06bbf29..43ec7b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#873](https://github.com/spegel-org/spegel/pull/873) Refactor web to use internal mux router. - [#875](https://github.com/spegel-org/spegel/pull/875) Change debug unit formatting and add totals. - [#880](https://github.com/spegel-org/spegel/pull/880) Refactor store advertisement to list content. +- [#888](https://github.com/spegel-org/spegel/pull/888) Refactor OCI events to support content events. ### Deprecated diff --git a/pkg/oci/containerd.go b/pkg/oci/containerd.go index 710b7e0..f7699c2 100644 --- a/pkg/oci/containerd.go +++ b/pkg/oci/containerd.go @@ -18,6 +18,7 @@ import ( eventtypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/pkg/labels" "github.com/containerd/errdefs" "github.com/containerd/typeurl/v2" @@ -174,52 +175,42 @@ func verifyStatusResponse(resp *runtimeapi.StatusResponse, configPath string) er return fmt.Errorf("Containerd registry config path is %s but needs to contain path %s for mirror configuration to take effect", *cfg.Registry.ConfigPath, configPath) } -func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) { - imgCh := make(chan ImageEvent) - errCh := make(chan error) +func (c *Containerd) Subscribe(ctx context.Context) (<-chan OCIEvent, error) { client, err := c.Client() if err != nil { - return nil, nil, err + return nil, err } + log := logr.FromContextOrDiscard(ctx) + + ctx, cancel := context.WithCancel(ctx) + eventCh := make(chan OCIEvent) envelopeCh, cErrCh := client.EventService().Subscribe(ctx, c.eventFilter) go func() { - for envelope := range envelopeCh { - var img Image - imageName, eventType, err := getEventImage(envelope.Event) - if err != nil { - errCh <- err - continue - } - switch eventType { - case CreateEvent, UpdateEvent: - cImg, err := client.GetImage(ctx, imageName) + defer close(eventCh) + for { + select { + case <-ctx.Done(): + return + case envelope := <-envelopeCh: + events, err := c.convertEvent(ctx, *envelope) if err != nil { - errCh <- err + log.Error(err, "error when handling event") continue } - img, err = ParseImageRequireDigest(cImg.Name(), cImg.Target().Digest) - if err != nil { - errCh <- err - continue - } - case DeleteEvent: - img, err = ParseImageRequireDigest(imageName, "") - if err != nil { - errCh <- err - continue + for _, event := range events { + eventCh <- event } } - imgCh <- ImageEvent{Image: img, Type: eventType} } - close(imgCh) }() go func() { + // Required so that the event channel closes in case Containerd is restarted. + defer cancel() for err := range cErrCh { - errCh <- err + log.Error(err, "containerd event error") } - close(errCh) }() - return imgCh, errCh, nil + return eventCh, nil } func (c *Containerd) ListImages(ctx context.Context) ([]Image, error) { @@ -341,6 +332,49 @@ func (c *Containerd) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadSe }, nil } +func (c *Containerd) convertEvent(ctx context.Context, envelope events.Envelope) ([]OCIEvent, error) { + if envelope.Event == nil { + return nil, errors.New("envelope event cannot be nil") + } + evt, err := typeurl.UnmarshalAny(envelope.Event) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal envelope event: %w", err) + } + switch e := evt.(type) { + case *eventtypes.ImageCreate: + // Containerd creates an image create event for image config which we ignore. + if err := digest.Digest(e.GetName()).Validate(); err == nil { + return nil, nil + } + img, err := ParseImage(e.GetName()) + if err != nil { + return nil, err + } + // Pull by tag creates an event only for the tag. We dont get content to avoid advertising twice. + if img.Digest == "" { + return []OCIEvent{{Type: CreateEvent, Key: e.GetName()}}, nil + } + // Walk the image to return events for all of the layers. + dgsts, err := WalkImage(ctx, c, img) + if err != nil { + return nil, fmt.Errorf("could not get digests for image %s: %w", img.String(), err) + } + events := []OCIEvent{} + for _, dgst := range dgsts { + events = append(events, OCIEvent{Type: CreateEvent, Key: dgst.String()}) + } + return events, nil + case *eventtypes.ImageDelete: + // Ignore delete event created for image config. + if err := digest.Digest(e.GetName()).Validate(); err == nil { + return nil, nil + } + return []OCIEvent{{Type: DeleteEvent, Key: e.GetName()}}, nil + default: + return nil, errors.New("unsupported event type") + } +} + func parseContentRegistries(l map[string]string) []string { registries := []string{} for k := range l { @@ -352,26 +386,6 @@ func parseContentRegistries(l map[string]string) []string { return registries } -func getEventImage(e typeurl.Any) (string, EventType, error) { - if e == nil { - return "", "", errors.New("any cannot be nil") - } - evt, err := typeurl.UnmarshalAny(e) - if err != nil { - return "", "", fmt.Errorf("failed to unmarshal any: %w", err) - } - switch e := evt.(type) { - case *eventtypes.ImageCreate: - return e.Name, CreateEvent, nil - case *eventtypes.ImageUpdate: - return e.Name, UpdateEvent, nil - case *eventtypes.ImageDelete: - return e.Name, DeleteEvent, nil - default: - return "", "", errors.New("unsupported event type") - } -} - func createFilters(mirroredRegistries []url.URL) (string, string, string) { registryHosts := []string{} for _, registry := range mirroredRegistries { @@ -383,7 +397,7 @@ func createFilters(mirroredRegistries []url.URL) (string, string, string) { // as we cant mirror images without registries. imageFilter = `name~="^.+/"` } - eventFilter := fmt.Sprintf(`topic~="/images/create|/images/update|/images/delete",event.%s`, imageFilter) + eventFilter := fmt.Sprintf(`topic~="/images/create|/images/delete",event.%s`, imageFilter) contentFilters := []string{} for _, registry := range mirroredRegistries { contentFilters = append(contentFilters, fmt.Sprintf(`labels."%s.%s"~="^."`, labels.LabelDistributionSource, registry.Host)) diff --git a/pkg/oci/containerd_test.go b/pkg/oci/containerd_test.go index df77c4c..7b79971 100644 --- a/pkg/oci/containerd_test.go +++ b/pkg/oci/containerd_test.go @@ -9,8 +9,6 @@ import ( "path/filepath" "testing" - eventtypes "github.com/containerd/containerd/api/events" - "github.com/containerd/typeurl/v2" "github.com/go-logr/logr" "github.com/stretchr/testify/require" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -260,14 +258,14 @@ func TestCreateFilter(t *testing.T) { name: "with registry filtering", registries: []string{"https://docker.io", "https://gcr.io"}, expectedImageFilter: `name~="^(docker\\.io|gcr\\.io)/"`, - expectedEventFilter: `topic~="/images/create|/images/update|/images/delete",event.name~="^(docker\\.io|gcr\\.io)/"`, + expectedEventFilter: `topic~="/images/create|/images/delete",event.name~="^(docker\\.io|gcr\\.io)/"`, expectedContentFilter: `labels."containerd.io/distribution.source.docker.io"~="^." labels."containerd.io/distribution.source.gcr.io"~="^."`, }, { name: "without registry filtering", registries: []string{}, expectedImageFilter: `name~="^.+/"`, - expectedEventFilter: `topic~="/images/create|/images/update|/images/delete",event.name~="^.+/"`, + expectedEventFilter: `topic~="/images/create|/images/delete",event.name~="^.+/"`, expectedContentFilter: "", }, } @@ -283,74 +281,6 @@ func TestCreateFilter(t *testing.T) { } } -func TestGetEventImage(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - data any - expectedErr string - expectedName string - expectedEventType EventType - }{ - { - name: "type url is nil", - data: nil, - expectedErr: "any cannot be nil", - }, - { - name: "unknown event", - data: &eventtypes.ContainerCreate{}, - expectedErr: "unsupported event type", - }, - { - name: "create event", - data: &eventtypes.ImageCreate{ - Name: "create", - }, - expectedName: "create", - expectedEventType: CreateEvent, - }, - { - name: "update event", - data: &eventtypes.ImageUpdate{ - Name: "update", - }, - expectedName: "update", - expectedEventType: UpdateEvent, - }, - { - name: "delete event", - data: &eventtypes.ImageDelete{ - Name: "delete", - }, - expectedName: "delete", - expectedEventType: DeleteEvent, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - var e typeurl.Any - var err error - if tt.data != nil { - e, err = typeurl.MarshalAny(tt.data) - require.NoError(t, err) - } - - name, event, err := getEventImage(e) - if tt.expectedErr != "" { - require.EqualError(t, err, tt.expectedErr) - return - } - require.NoError(t, err) - require.Equal(t, tt.expectedName, name) - require.Equal(t, tt.expectedEventType, event) - }) - } -} - func TestMirrorConfiguration(t *testing.T) { t.Parallel() diff --git a/pkg/oci/memory.go b/pkg/oci/memory.go index 7b7f9fb..6260033 100644 --- a/pkg/oci/memory.go +++ b/pkg/oci/memory.go @@ -36,8 +36,8 @@ func (m *Memory) Verify(ctx context.Context) error { return nil } -func (m *Memory) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) { - return nil, nil, nil +func (m *Memory) Subscribe(ctx context.Context) (<-chan OCIEvent, error) { + return nil, nil } func (m *Memory) ListImages(ctx context.Context) ([]Image, error) { diff --git a/pkg/oci/oci.go b/pkg/oci/oci.go index 52b3677..dc36c96 100644 --- a/pkg/oci/oci.go +++ b/pkg/oci/oci.go @@ -21,13 +21,12 @@ type EventType string const ( CreateEvent EventType = "CREATE" - UpdateEvent EventType = "UPDATE" DeleteEvent EventType = "DELETE" ) -type ImageEvent struct { - Image Image - Type EventType +type OCIEvent struct { + Type EventType + Key string } type Content struct { @@ -43,7 +42,7 @@ type Store interface { Verify(ctx context.Context) error // Subscribe will notify for any image events ocuring in the store backend. - Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) + Subscribe(ctx context.Context) (<-chan OCIEvent, error) // ListImages returns a list of all local images. ListImages(ctx context.Context) ([]Image, error) @@ -102,14 +101,14 @@ func DetermineMediaType(b []byte) (string, error) { return "", errors.New("not able to determine media type") } -func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) { - keys := []string{} +func WalkImage(ctx context.Context, store Store, img Image) ([]digest.Digest, error) { + dgsts := []digest.Digest{} err := walk(ctx, []digest.Digest{img.Digest}, func(dgst digest.Digest) ([]digest.Digest, error) { b, mt, err := store.GetManifest(ctx, dgst) if err != nil { return nil, err } - keys = append(keys, dgst.String()) + dgsts = append(dgsts, dgst) switch mt { case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: var idx ocispec.Index @@ -137,9 +136,9 @@ func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) { if err != nil { return nil, err } - keys = append(keys, manifest.Config.Digest.String()) + dgsts = append(dgsts, manifest.Config.Digest) for _, layer := range manifest.Layers { - keys = append(keys, layer.Digest.String()) + dgsts = append(dgsts, layer.Digest) } return nil, nil default: @@ -149,10 +148,10 @@ func WalkImage(ctx context.Context, store Store, img Image) ([]string, error) { if err != nil { return nil, fmt.Errorf("failed to walk image manifests: %w", err) } - if len(keys) == 0 { + if len(dgsts) == 0 { return nil, errors.New("no image digests found") } - return keys, nil + return dgsts, nil } func walk(ctx context.Context, dgsts []digest.Digest, handler func(dgst digest.Digest) ([]digest.Digest, error)) error { diff --git a/pkg/oci/oci_test.go b/pkg/oci/oci_test.go index a0c1a7a..968a7ca 100644 --- a/pkg/oci/oci_test.go +++ b/pkg/oci/oci_test.go @@ -184,14 +184,14 @@ func TestOCIClient(t *testing.T) { } identifiersTests := []struct { - imageName string - imageDigest string - expectedKeys []string + imageName string + imageDigest string + expectedDgsts []digest.Digest }{ { imageName: "ghcr.io/spegel-org/spegel:v0.0.8-with-media-type", imageDigest: "sha256:9506c8e7a2d0a098d43cadfd7ecdc3c91697e8188d3a1245943b669f717747b4", - expectedKeys: []string{ + expectedDgsts: []digest.Digest{ "sha256:9506c8e7a2d0a098d43cadfd7ecdc3c91697e8188d3a1245943b669f717747b4", "sha256:44cb2cf712c060f69df7310e99339c1eb51a085446f1bb6d44469acff35b4355", "sha256:d715ba0d85ee7d37da627d0679652680ed2cb23dde6120f25143a0b8079ee47e", @@ -237,7 +237,7 @@ func TestOCIClient(t *testing.T) { { imageName: "ghcr.io/spegel-org/spegel:v0.0.8-without-media-type", imageDigest: "sha256:d8df04365d06181f037251de953aca85cc16457581a8fc168f4957c978e1008b", - expectedKeys: []string{ + expectedDgsts: []digest.Digest{ "sha256:d8df04365d06181f037251de953aca85cc16457581a8fc168f4957c978e1008b", "sha256:44cb2cf712c060f69df7310e99339c1eb51a085446f1bb6d44469acff35b4355", "sha256:d715ba0d85ee7d37da627d0679652680ed2cb23dde6120f25143a0b8079ee47e", @@ -287,9 +287,9 @@ func TestOCIClient(t *testing.T) { img, err := ParseImageRequireDigest(tt.imageName, digest.Digest(tt.imageDigest)) require.NoError(t, err) - keys, err := WalkImage(ctx, ociStore, img) + dgsts, err := WalkImage(ctx, ociStore, img) require.NoError(t, err) - require.Equal(t, tt.expectedKeys, keys) + require.Equal(t, tt.expectedDgsts, dgsts) }) } }) diff --git a/pkg/state/state.go b/pkg/state/state.go index 5b596f5..9005d04 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -3,7 +3,6 @@ package state import ( "context" "errors" - "fmt" "time" "github.com/go-logr/logr" @@ -16,7 +15,7 @@ import ( func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resolveLatestTag bool) error { log := logr.FromContextOrDiscard(ctx) - eventCh, errCh, err := ociStore.Subscribe(ctx) + eventCh, err := ociStore.Subscribe(ctx) if err != nil { return err } @@ -31,7 +30,7 @@ func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resol case <-ctx.Done(): return nil case <-tickerCh: - log.Info("running tick state update") + log.Info("running state update") err := tick(ctx, ociStore, router, resolveLatestTag) if err != nil { log.Error(err, "received errors when updating all images") @@ -39,18 +38,14 @@ func Track(ctx context.Context, ociStore oci.Store, router routing.Router, resol } case event, ok := <-eventCh: if !ok { - return errors.New("image event channel closed") + return errors.New("event channel closed") } - log.Info("received image event", "image", event.Image.String(), "type", event.Type) - if _, err := update(ctx, ociStore, router, event, false, resolveLatestTag); err != nil { - log.Error(err, "received error when updating image") + log.Info("OCI event", "key", event.Key, "type", event.Type) + err := handle(ctx, router, event) + if err != nil { + log.Error(err, "could not handle event") continue } - case err, ok := <-errCh: - if !ok { - return errors.New("image error channel closed") - } - log.Error(err, "event channel error") } } } @@ -112,42 +107,13 @@ func tick(ctx context.Context, ociStore oci.Store, router routing.Router, resolv return nil } -func update(ctx context.Context, ociStore oci.Store, router routing.Router, event oci.ImageEvent, skipDigests, resolveLatestTag bool) (int, error) { - keys := []string{} - //nolint: staticcheck // Simplify in future. - if !(!resolveLatestTag && event.Image.IsLatestTag()) { - if tagName, ok := event.Image.TagName(); ok { - keys = append(keys, tagName) - } +func handle(ctx context.Context, router routing.Router, event oci.OCIEvent) error { + if event.Type != oci.CreateEvent { + return nil } - if event.Type == oci.DeleteEvent { - // We don't know how many digest keys were associated with the deleted image; - // that can only be updated by the full image list sync in all(). - metrics.AdvertisedImages.WithLabelValues(event.Image.Registry).Sub(1) - // DHT doesn't actually have any way to stop providing a key, you just have to wait for the record to expire - // from the datastore. Record TTL is a datastore-level value, so we can't even re-provide with a shorter TTL. - return 0, nil - } - if !skipDigests { - dgsts, err := oci.WalkImage(ctx, ociStore, event.Image) - if err != nil { - return 0, fmt.Errorf("could not get digests for image %s: %w", event.Image.String(), err) - } - keys = append(keys, dgsts...) - } - err := router.Advertise(ctx, keys) + err := router.Advertise(ctx, []string{event.Key}) if err != nil { - return 0, fmt.Errorf("could not advertise image %s: %w", event.Image.String(), err) + return err } - if event.Type == oci.CreateEvent { - // We don't know how many unique digest keys will be associated with the new image; - // that can only be updated by the full image list sync in all(). - metrics.AdvertisedImages.WithLabelValues(event.Image.Registry).Add(1) - if event.Image.Tag == "" { - metrics.AdvertisedImageDigests.WithLabelValues(event.Image.Registry).Add(1) - } else { - metrics.AdvertisedImageTags.WithLabelValues(event.Image.Registry).Add(1) - } - } - return len(keys), nil + return nil }