Merge pull request #452 from spegel-org/fix/subscribe-error

Fix Containerd subscribe returning on any error
This commit is contained in:
Philip Laine 2024-04-23 20:51:43 +02:00 committed by GitHub
commit fb0dcc771f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 22 additions and 18 deletions

View File

@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- [#452](https://github.com/spegel-org/spegel/pull/452) Fix Containerd Subscribe returning on any error.
### Security
- [#451](https://github.com/spegel-org/spegel/pull/451) Bump golang.org/x/net from 0.21.0 to 0.23.0.

View File

@ -136,50 +136,49 @@ func verifyStatusResponse(resp *runtimeapi.StatusResponse, configPath string) er
return fmt.Errorf("Containerd registry config path is %s but needs to contain path %s for mirror configuration to take effect", cfg.Registry.ConfigPath, configPath)
}
func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error) {
imgCh := make(chan ImageEvent, 1)
errCh := make(chan error, 1)
func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) {
imgCh := make(chan ImageEvent)
errCh := make(chan error)
client, err := c.Client()
if err != nil {
errCh <- err
close(imgCh)
close(errCh)
return imgCh, errCh
return nil, nil, err
}
envelopeCh, cErrCh := client.EventService().Subscribe(ctx, c.eventFilter)
go func() {
defer func() {
close(imgCh)
close(errCh)
}()
for envelope := range envelopeCh {
var img Image
imageName, eventType, err := getEventImage(envelope.Event)
if err != nil {
errCh <- err
return
continue
}
switch eventType {
case CreateEvent, UpdateEvent:
cImg, err := client.GetImage(ctx, imageName)
if err != nil {
errCh <- err
return
continue
}
img, err = Parse(cImg.Name(), cImg.Target().Digest)
if err != nil {
errCh <- err
return
continue
}
case DeleteEvent:
img, err = Parse(imageName, "")
if err != nil {
errCh <- err
return
continue
}
}
imgCh <- ImageEvent{Image: img, Type: eventType}
}
}()
return imgCh, channel.Merge(errCh, cErrCh)
return imgCh, channel.Merge(errCh, cErrCh), nil
}
func (c *Containerd) ListImages(ctx context.Context) ([]Image, error) {

View File

@ -27,8 +27,8 @@ func (m *MockClient) Verify(ctx context.Context) error {
return nil
}
func (m *MockClient) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error) {
return nil, nil
func (m *MockClient) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error) {
return nil, nil, nil
}
func (m *MockClient) ListImages(ctx context.Context) ([]Image, error) {

View File

@ -14,7 +14,7 @@ type UnknownDocument struct {
type Client interface {
Name() string
Verify(ctx context.Context) error
Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error)
Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan error, error)
ListImages(ctx context.Context) ([]Image, error)
AllIdentifiers(ctx context.Context, img Image) ([]string, error)
Resolve(ctx context.Context, ref string) (digest.Digest, error)

View File

@ -16,7 +16,10 @@ import (
func Track(ctx context.Context, ociClient oci.Client, router routing.Router, resolveLatestTag bool) error {
log := logr.FromContextOrDiscard(ctx)
eventCh, errCh := ociClient.Subscribe(ctx)
eventCh, errCh, err := ociClient.Subscribe(ctx)
if err != nil {
return err
}
immediate := make(chan time.Time, 1)
immediate <- time.Now()
expirationTicker := time.NewTicker(routing.KeyTTL - time.Minute)