Skip to content

Commit

Permalink
fix: volume discovery improvements
Browse files Browse the repository at this point in the history
Use shared locks, discover more partitions, some other small changes.

Re-enable the flaky test.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Jun 6, 2024
1 parent 80ca8ff commit 7c9a143
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ require (
github.com/siderolabs/gen v0.5.0
github.com/siderolabs/go-api-signature v0.3.2
github.com/siderolabs/go-blockdevice v0.4.7
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa
github.com/siderolabs/go-circular v0.2.0
github.com/siderolabs/go-cmd v0.1.1
github.com/siderolabs/go-copy v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,8 @@ github.com/siderolabs/go-api-signature v0.3.2 h1:blqrZF1GM7TWgq7mY7CsR+yQ93u6az0
github.com/siderolabs/go-api-signature v0.3.2/go.mod h1:punhUOaXa7LELYBRCUhfgUGH6ieVz68GrP98apCKXj8=
github.com/siderolabs/go-blockdevice v0.4.7 h1:2bk4WpEEflGxjrNwp57ye24Pr+cYgAiAeNMWiQOuWbQ=
github.com/siderolabs/go-blockdevice v0.4.7/go.mod h1:4PeOuk71pReJj1JQEXDE7kIIQJPVe8a+HZQa+qjxSEA=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192 h1:16/cHDGhTUDBtfIftOkuHWhJcQdpa/FwwWPcTq4aOxc=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192/go.mod h1:UBbbc+L7hU0UggOQeKCA+Qp3ImGkSeaLfVOiCbxRxEI=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa h1:OjQLrcis/GuqaqxnIw2dxp4ZzT/zk5p1GI3NxcMxkrA=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa/go.mod h1:5GnL7VLNp5/vgiwYP74fi6KuTUfqGcRxQxtto2tzD+I=
github.com/siderolabs/go-circular v0.2.0 h1:Xca8zrjF/YsujLbwDSojkKzJe7ngetnpuIJn8N78DJI=
github.com/siderolabs/go-circular v0.2.0/go.mod h1:rrYCwHLYWmxqrmZP+LjYtwB2a55lxzQi0Ztu1VpWZSc=
github.com/siderolabs/go-cmd v0.1.1 h1:nTouZUSxLeiiEe7hFexSVvaTsY/3O8k1s08BxPRrsps=
Expand Down
50 changes: 36 additions & 14 deletions internal/app/machined/pkg/controllers/block/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package block

import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -74,11 +75,17 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime,
continue
}

if err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil {
logger.Debug("rescanning devices", zap.Strings("devices", maps.Keys(nextRescan)))

if nextRescanBatch, err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil {
return fmt.Errorf("failed to rescan devices: %w", err)
}
} else {
nextRescan = map[string]int{}

nextRescan = map[string]int{}
for id := range nextRescanBatch {
nextRescan[id] = lastObservedGenerations[id]
}
}
case <-r.EventCh():
devices, err := safe.ReaderListAll[*block.Device](ctx, r)
if err != nil {
Expand Down Expand Up @@ -125,10 +132,11 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime,
}
}

//nolint:gocyclo
func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) error {
//nolint:gocyclo,cyclop
func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) (map[string]struct{}, error) {
failedIDs := map[string]struct{}{}
touchedIDs := map[string]struct{}{}
nextRescan := map[string]struct{}{}

for _, id := range ids {
device, err := safe.ReaderGetByID[*block.Device](ctx, r, id)
Expand All @@ -139,18 +147,27 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
continue
}

return fmt.Errorf("failed to get device: %w", err)
return nil, fmt.Errorf("failed to get device: %w", err)
}

info, err := blkid.ProbePath(filepath.Join("/dev", id))
info, err := blkid.ProbePath(filepath.Join("/dev", id), blkid.WithProbeLogger(logger.With(zap.String("device", id))))
if err != nil {
logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err))
if errors.Is(err, blkid.ErrFailedLock) {
// failed to lock the blockdevice, retry later
logger.Debug("failed to lock device, retrying later", zap.String("id", id))

failedIDs[id] = struct{}{}
nextRescan[id] = struct{}{}
} else {
logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err))

failedIDs[id] = struct{}{}
}

continue
}

logger.Debug("probed device", zap.String("id", id), zap.Any("info", info))

if err = safe.WriterModify(ctx, r, block.NewDiscoveredVolume(block.NamespaceName, id), func(dv *block.DiscoveredVolume) error {
dv.TypedSpec().Type = device.TypedSpec().Type
dv.TypedSpec().DevicePath = device.TypedSpec().DevicePath
Expand All @@ -164,7 +181,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim

return nil
}); err != nil {
return fmt.Errorf("failed to write discovered volume: %w", err)
return nil, fmt.Errorf("failed to write discovered volume: %w", err)
}

touchedIDs[id] = struct{}{}
Expand All @@ -178,6 +195,11 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
dv.TypedSpec().Parent = id

dv.TypedSpec().Size = nested.ProbedSize

if dv.TypedSpec().Size == 0 {
dv.TypedSpec().Size = nested.PartitionSize
}

dv.TypedSpec().SectorSize = info.SectorSize
dv.TypedSpec().IOSize = info.IOSize

Expand Down Expand Up @@ -205,7 +227,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim

return nil
}); err != nil {
return fmt.Errorf("failed to write discovered volume: %w", err)
return nil, fmt.Errorf("failed to write discovered volume: %w", err)
}

touchedIDs[partID] = struct{}{}
Expand All @@ -215,7 +237,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
// clean up discovered volumes
discoveredVolumes, err := safe.ReaderListAll[*block.DiscoveredVolume](ctx, r)
if err != nil {
return fmt.Errorf("failed to list discovered volumes: %w", err)
return nil, fmt.Errorf("failed to list discovered volumes: %w", err)
}

for iterator := discoveredVolumes.Iterator(); iterator.Next(); {
Expand All @@ -238,12 +260,12 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
if isFailed || parentTouched {
// if the probe failed, or if the parent was touched, while this device was not, remove it
if err = r.Destroy(ctx, dv.Metadata()); err != nil {
return fmt.Errorf("failed to destroy discovered volume: %w", err)
return nil, fmt.Errorf("failed to destroy discovered volume: %w", err)
}
}
}

return nil
return nextRescan, nil
}

func (ctrl *DiscoveryController) fillDiscoveredVolumeFromInfo(dv *block.DiscoveredVolume, info blkid.ProbeResult) {
Expand Down
27 changes: 19 additions & 8 deletions internal/integration/api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ func (suite *VolumesSuite) TearDownTest() {

// TestDiscoveredVolumes verifies that standard Talos partitions are discovered.
func (suite *VolumesSuite) TestDiscoveredVolumes() {
suite.T().Skip("skipping test, as it's flaky (going to address it later)")

if !suite.Capabilities().SupportsVolumes {
suite.T().Skip("cluster doesn't support volumes")
}

node := suite.RandomDiscoveredNodeInternalIP()
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
suite.testDiscoveredVolumes(node)
})
}
}

func (suite *VolumesSuite) testDiscoveredVolumes(node string) {
ctx := client.WithNode(suite.ctx, node)

volumes, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI)
Expand All @@ -59,7 +64,9 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
expectedVolumes := map[string]struct {
Name string
}{
"META": {},
"META": {
Name: "talosmeta",
},
"STATE": {
Name: "xfs",
},
Expand All @@ -71,12 +78,12 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
for iterator := volumes.Iterator(); iterator.Next(); {
dv := iterator.Value()

suite.T().Logf("Volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label)
suite.T().Logf("volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label)

partitionLabel := dv.TypedSpec().PartitionLabel
filesystemLabel := dv.TypedSpec().Label

// this is encrypted partition, skip it, we should see another device with actual filesystem
// this is encrypted partition, skip it, we should see another device with the actual filesystem
if dv.TypedSpec().Name == "luks" {
continue
}
Expand All @@ -95,12 +102,16 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
}
}

suite.Assert().Equal(expected.Name, dv.TypedSpec().Name)
suite.Assert().Equal(expected.Name, dv.TypedSpec().Name, "node: ", node)

delete(expectedVolumes, id)
}

suite.Assert().Empty(expectedVolumes)
suite.Assert().Empty(expectedVolumes, "node: ", node)

if suite.T().Failed() {
suite.DumpLogs(suite.ctx, node, "controller-runtime", "block.")
}
}

func init() {
Expand Down
29 changes: 29 additions & 0 deletions internal/integration/base/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/cluster"
"github.com/siderolabs/talos/pkg/cluster/check"
"github.com/siderolabs/talos/pkg/machinery/api/common"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/client"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
Expand Down Expand Up @@ -748,6 +749,34 @@ waitLoop:
}
}

// DumpLogs dumps a set of logs from the node.
func (apiSuite *APISuite) DumpLogs(ctx context.Context, node string, service, pattern string) {
nodeCtx := client.WithNode(ctx, node)

logsStream, err := apiSuite.Client.Logs(
nodeCtx,
constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD,
service,
false,
-1,
)
apiSuite.Require().NoError(err)

logReader, err := client.ReadStream(logsStream)
apiSuite.Require().NoError(err)

defer logReader.Close() //nolint:errcheck

scanner := bufio.NewScanner(logReader)

for scanner.Scan() {
if pattern == "" || strings.Contains(scanner.Text(), pattern) {
apiSuite.T().Logf("%s (%s): %s", node, service, scanner.Text())
}
}
}

// TearDownSuite closes Talos API client.
func (apiSuite *APISuite) TearDownSuite() {
if apiSuite.Client != nil {
Expand Down
21 changes: 21 additions & 0 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,27 @@ func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout tim
}
}

// LogPodLogs logs the logs of the pod with the given namespace and name.
func (k8sSuite *K8sSuite) LogPodLogs(ctx context.Context, namespace, podName string) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

req := k8sSuite.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{})

readCloser, err := req.Stream(ctx)
if err != nil {
k8sSuite.T().Logf("failed to get pod logs: %s", err)
}

defer readCloser.Close() //nolint:errcheck

scanner := bufio.NewScanner(readCloser)

for scanner.Scan() {
k8sSuite.T().Logf("%s/%s: %s", namespace, podName, scanner.Text())
}
}

// WaitForPodToBeDeleted waits for the pod with the given namespace and name to be deleted.
func (k8sSuite *K8sSuite) WaitForPodToBeDeleted(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand Down
22 changes: 17 additions & 5 deletions internal/integration/k8s/tink.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,13 @@ func (suite *TinkSuite) TestDeploy() {
cpCfgBytes, err := cpCfg.Bytes()
suite.Require().NoError(err)

suite.waitForEndpointReady(talosEndpoint)
readyErr := suite.waitForEndpointReady(talosEndpoint)

if readyErr != nil {
suite.LogPodLogs(ctx, namespace, ss+"-0")
}

suite.Require().NoError(readyErr)

insecureClient, err := client.New(ctx,
client.WithEndpoints(talosEndpoint),
Expand All @@ -182,7 +188,13 @@ func (suite *TinkSuite) TestDeploy() {

suite.T().Logf("talosconfig = %s", string(ensure.Value(talosconfig.Bytes())))

suite.waitForEndpointReady(talosEndpoint)
readyErr = suite.waitForEndpointReady(talosEndpoint)

if readyErr != nil {
suite.LogPodLogs(ctx, namespace, ss+"-0")
}

suite.Require().NoError(readyErr)

talosClient, err := client.New(ctx,
client.WithConfigContext(talosconfig.Contexts[talosconfig.Context]),
Expand Down Expand Up @@ -246,8 +258,8 @@ func (access *tinkClusterAccess) NodesByType(typ machine.Type) []cluster.NodeInf
}
}

func (suite *TinkSuite) waitForEndpointReady(endpoint string) {
suite.Require().NoError(retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error {
func (suite *TinkSuite) waitForEndpointReady(endpoint string) error {
return retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error {
c, err := tls.Dial("tcp", endpoint,
&tls.Config{
InsecureSkipVerify: true,
Expand All @@ -259,7 +271,7 @@ func (suite *TinkSuite) waitForEndpointReady(endpoint string) {
}

return retry.ExpectedError(err)
}))
})
}

func (suite *TinkSuite) getTinkManifests(namespace, serviceName, ssName, talosImage string) []unstructured.Unstructured {
Expand Down

0 comments on commit 7c9a143

Please sign in to comment.