diff --git a/CHANGELOG.md b/CHANGELOG.md index bd99049..66a48df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#743](https://github.com/spegel-org/spegel/pull/743) Charts - removed metrics label from bootstrap service - [#748](https://github.com/spegel-org/spegel/pull/748) Fix topology annotation. - [#785](https://github.com/spegel-org/spegel/pull/785) Fix verification of digests when parsing distribution path. +- [#798](https://github.com/spegel-org/spegel/pull/798) Restart Spegel if Containerd event subscription is disconnected. ### Security diff --git a/pkg/oci/containerd.go b/pkg/oci/containerd.go index 5452ec2..26f66e0 100644 --- a/pkg/oci/containerd.go +++ b/pkg/oci/containerd.go @@ -29,8 +29,6 @@ import ( "github.com/spf13/afero" "google.golang.org/grpc" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - - "github.com/spegel-org/spegel/internal/channel" ) const ( @@ -179,10 +177,6 @@ func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan e } envelopeCh, cErrCh := client.EventService().Subscribe(ctx, c.eventFilter) go func() { - defer func() { - close(imgCh) - close(errCh) - }() for envelope := range envelopeCh { var img Image imageName, eventType, err := getEventImage(envelope.Event) @@ -211,8 +205,15 @@ func (c *Containerd) Subscribe(ctx context.Context) (<-chan ImageEvent, <-chan e } imgCh <- ImageEvent{Image: img, Type: eventType} } + close(imgCh) }() - return imgCh, channel.Merge(errCh, cErrCh), nil + go func() { + for err := range cErrCh { + errCh <- err + } + close(errCh) + }() + return imgCh, errCh, nil } func (c *Containerd) ListImages(ctx context.Context) ([]Image, error) { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 81c5043..a71bbc7 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" "testing" + "time" "golang.org/x/sync/errgroup" @@ -162,6 +163,14 @@ func TestE2E(t *testing.T) { // Verify that Spegel has never restarted restartOutput = command(ctx, t, fmt.Sprintf("kubectl --kubeconfig %s --namespace spegel get pods -o=jsonpath='{.items[*].status.containerStatuses[0].restartCount}'", kcPath)) require.Equal(t, "0", restartOutput) + + // Restart Containerd and verify that Spegel restarts + t.Log("Restarting Containerd") + command(ctx, t, fmt.Sprintf("docker exec %s-worker3 systemctl restart containerd", kindName)) + require.Eventually(t, func() bool { + restartOutput = command(ctx, t, fmt.Sprintf("kubectl --kubeconfig %s --namespace spegel get pods -o=jsonpath='{.items[*].status.containerStatuses[0].restartCount}'", kcPath)) + return restartOutput == "1" + }, 5*time.Second, 1*time.Second) } func TestDevDeploy(t *testing.T) {