Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve: Dynamic disk configuration is not correctly provisioned #4

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ func DeployCommand() *coral.Command {
}
var fileName string
cmd.Flags().StringVarP(&fileName, "file", "f", "", "configuration file")
cmd.Flags().StringVarP(&m.DynamicConfigFile, "dynamicFile", "d", "", "dynamic configuration file")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a dynamic configuration file?

cmd.Flags().StringVarP(&m.User, "user", "u", utils.CurrentUser(), "The user name to login via SSH. The user must has root (or sudo) privilege.")
cmd.Flags().IntVarP(&m.SshPort, "port", "p", 22, "The port to SSH.")
cmd.Flags().StringVarP(&m.IdentityFile, "identity_file", "i", m.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().StringVarP(&m.Version, "version", "v", "", "The SeaweedFS version")
cmd.Flags().StringVarP(&m.EnvoyVersion, "envoyVersion", "", "", "Envoy version")
cmd.Flags().StringVarP(&m.ComponentToDeploy, "component", "c", "", "[master|volume|filer|envoy] only install one component")
cmd.Flags().BoolVarP(&m.PrepareVolumeDisks, "mountDisks", "", true, "auto mount disks on volume server if unmounted")
cmd.Flags().BoolVarP(&m.ForceRestart, "restart", "", false, "force to restart the service")
Expand All @@ -48,16 +50,37 @@ func DeployCommand() *coral.Command {
}

fmt.Println(fileName)
spec := &spec.Specification{}
deploySpec := &spec.Specification{}
data, readErr := os.ReadFile(fileName)
if readErr != nil {
return fmt.Errorf("read %s: %v", fileName, readErr)
}
if unmarshalErr := yaml.Unmarshal(data, spec); unmarshalErr != nil {
if unmarshalErr := yaml.Unmarshal(data, deploySpec); unmarshalErr != nil {
return fmt.Errorf("unmarshal %s: %v", fileName, unmarshalErr)
}

return m.DeployCluster(spec)
// Check if DynamicConfig file name is specified and file exists
if len(m.DynamicConfigFile) > 0 {
if _, err := os.Stat(m.DynamicConfigFile); err == nil {
// Try to load dynamic config
data, readErr := os.ReadFile(m.DynamicConfigFile)
if readErr != nil {
return fmt.Errorf("read dynamic config %s: %v", m.DynamicConfigFile, readErr)
}
dList := make(map[string][]spec.FolderSpec)
if unmarshalErr := yaml.Unmarshal(data, dList); unmarshalErr != nil {
return fmt.Errorf("unmarshal dynamic config %s: %v", fileName, unmarshalErr)
}
m.DynamicConfig.DynamicVolumeServers = dList
}
}

// dynamic config file name MUST be specified if prepare volume disks is active
if m.PrepareVolumeDisks && (len(m.DynamicConfigFile) < 1) {
return fmt.Errorf("`--dynamicFile <dynamic_config_file.yml>` should be specified when `--mountDisks` is set to true")
}

return m.DeployCluster(deploySpec)
}

return cmd
Expand Down
188 changes: 128 additions & 60 deletions pkg/cluster/manager/deploy_volume_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,56 @@ import (
"github.com/seaweedfs/seaweed-up/pkg/disks"
"github.com/seaweedfs/seaweed-up/pkg/operator"
"github.com/seaweedfs/seaweed-up/scripts"
"math"
"strings"
"time"
)

func (m *Manager) DeployVolumeServer(masters []string, volumeServerSpec *spec.VolumeServerSpec, index int) error {
func (m *Manager) UpdateDynamicVolumes(ip string, folders []spec.FolderSpec) {
m.DynamicConfig.Lock()
m.DynamicConfig.Changed = true
m.DynamicConfig.DynamicVolumeServers[ip] = folders
m.DynamicConfig.Unlock()
}

func (m *Manager) GetDynamicVolumes(ip string) []spec.FolderSpec {
m.DynamicConfig.Lock()
defer m.DynamicConfig.Unlock()

if m, ok := m.DynamicConfig.DynamicVolumeServers[ip]; ok {
return m
}
return []spec.FolderSpec{}
}

func (m *Manager) DeployVolumeServer(masters []string, globalOptions spec.GlobalOptions, volumeServerSpec *spec.VolumeServerSpec, index int) error {
return operator.ExecuteRemote(fmt.Sprintf("%s:%d", volumeServerSpec.Ip, volumeServerSpec.PortSsh), m.User, m.IdentityFile, m.sudoPass, func(op operator.CommandOperator) error {

component := "volume"
componentInstance := fmt.Sprintf("%s%d", component, index)
var buf bytes.Buffer
volumeServerSpec.WriteToBuffer(masters, &buf)

// Prepare dynamic folders
dynamicFolders := m.GetDynamicVolumes(volumeServerSpec.Ip)
if m.PrepareVolumeDisks {
if err := m.prepareUnmountedDisks(op); err != nil {
err, changed := m.prepareUnmountedDisks(op, &dynamicFolders, globalOptions.VolumeSizeLimitMB)
if err != nil {
return fmt.Errorf("prepare disks: %v", err)
}
if changed {
// Pass change info into upper layer
m.UpdateDynamicVolumes(volumeServerSpec.Ip, dynamicFolders)
}
}

// Update server specification for current server
for _, fld := range dynamicFolders {
flx := fld
volumeServerSpec.Folders = append(volumeServerSpec.Folders, &flx)
}

var buf bytes.Buffer
volumeServerSpec.WriteToBuffer(masters, &buf)

return m.deployComponentInstance(op, component, componentInstance, &buf)

})
Expand Down Expand Up @@ -59,95 +92,130 @@ func (m *Manager) StopVolumeServer(volumeServerSpec *spec.VolumeServerSpec, inde
})
}

func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator) error {
func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator, dynamicFolders *[]spec.FolderSpec, volumeSizeLimitMB int) (err error, changed bool) {
println("prepareUnmountedDisks...")
devices, mountpoints, err := disks.ListBlockDevices(op, []string{"/dev/sd", "/dev/nvme"})
if err != nil {
return fmt.Errorf("list device: %v", err)
return fmt.Errorf("list device: %v", err), false
}
fmt.Printf("mountpoints: %+v\n", mountpoints)

disks := make(map[string]*disks.BlockDevice)
diskList := make(map[string]*disks.BlockDevice)

// find all disks
// find all diskList
for _, dev := range devices {
if dev.Type == "disk" {
disks[dev.Path] = dev
diskList[dev.Path] = dev
}
}

fmt.Printf("disks0: %+v\n", disks)
fmt.Printf("All devices: %+v\n", diskList)

// remove disks already has partitions
// remove diskList already has partitions
for _, dev := range devices {
if dev.Type == "part" {
for parentPath, _ := range disks {
for parentPath, _ := range diskList {
if strings.HasPrefix(dev.Path, parentPath) {
// the disk is already partitioned
delete(disks, parentPath)
delete(diskList, parentPath)
}
}
}
}
fmt.Printf("disks1: %+v\n", disks)
fmt.Printf("Devices without partition: %+v\n", diskList)

// remove already has mount point
for k, dev := range disks {
for k, dev := range diskList {
if dev.MountPoint != "" {
delete(disks, k)
delete(diskList, k)
} else if dev.FilesystemType != "" {
delete(diskList, k)
}
}
fmt.Printf("disks2: %+v\n", disks)

// format disk if no fstype
for _, dev := range disks {
if dev.FilesystemType == "" {
info("mkfs " + dev.Path)
if err := m.sudo(op, fmt.Sprintf("mkfs.ext4 %s", dev.Path)); err != nil {
return fmt.Errorf("create file system on %s: %v", dev.Path, err)
}
fmt.Printf("Devices without mountpoint and filesystem: %+v\n", diskList)

// Process all unused RAW devices
for _, dev := range diskList {
// format disk
info("mkfs " + dev.Path)
if err := m.sudo(op, fmt.Sprintf("mkfs.ext4 %s", dev.Path)); err != nil {
return fmt.Errorf("create file system on %s: %v", dev.Path, err), changed
}
}

// mount them
for _, dev := range disks {
if dev.MountPoint == "" {
var targetMountPoint = ""
for i := 1; i < 100; i++ {
t := fmt.Sprintf("/data%d", i)
if _, found := mountpoints[t]; found {
continue
}
targetMountPoint = t
mountpoints[t] = struct{}{}
break
}
if targetMountPoint == "" {
return fmt.Errorf("no good mount point")
}
// Wait 2 sec for kernel data sync
time.Sleep(2 * time.Second)

data := map[string]interface{}{
"DevicePath": dev.Path,
"MountPoint": targetMountPoint,
}
prepareScript, err := scripts.RenderScript("prepare_disk.sh", data)
if err != nil {
return err
}
info("Installing mount_" + dev.DeviceName + ".sh")
err = op.Upload(prepareScript, fmt.Sprintf("/tmp/mount_%s.sh", dev.DeviceName), "0755")
if err != nil {
return fmt.Errorf("error received during upload mount script: %s", err)
}
// Get UUID
uuid, err := disks.GetDiskUUID(op, dev.Path)
if err != nil {
return fmt.Errorf("get disk UUID on %s: %v", dev.Path, err), changed
}

info("mount " + dev.DeviceName + "...")
err = op.Execute(fmt.Sprintf("cat /tmp/mount_%s.sh | SUDO_PASS=\"%s\" sh -\n", dev.DeviceName, m.sudoPass))
if err != nil {
return fmt.Errorf("error received during mount: %s", err)
if uuid == "" {
return fmt.Errorf("get empty disk UUID for %s", dev.Path), changed
}

diskList[dev.Path].UUID = uuid
dev.UUID = uuid

fmt.Printf("* disk [%s] UUID: [%s]", dev.Path, dev.UUID)

// Allocate spare mountpoint
var targetMountPoint = ""
for i := 1; i < 100; i++ {
t := fmt.Sprintf("/data%d", i)
if _, found := mountpoints[t]; found {
continue
}
targetMountPoint = t
mountpoints[t] = struct{}{}
break
}
if targetMountPoint == "" {
return fmt.Errorf("no good mount point"), changed
}

data := map[string]interface{}{
"DevicePath": dev.Path,
"DeviceUUID": dev.UUID,
"MountPoint": targetMountPoint,
}
prepareScript, err := scripts.RenderScript("prepare_disk.sh", data)
if err != nil {
return err, changed
}
info("Installing mount_" + dev.DeviceName + ".sh")
err = op.Upload(prepareScript, fmt.Sprintf("/tmp/mount_%s.sh", dev.DeviceName), "0755")
if err != nil {
return fmt.Errorf("error received during upload mount script: %s", err), changed
}

info("mount " + dev.DeviceName + " with UUID " + dev.UUID + " into " + targetMountPoint + "...")
err = op.Execute(fmt.Sprintf("cat /tmp/mount_%s.sh | SUDO_PASS=\"%s\" sh -\n", dev.DeviceName, m.sudoPass))
if err != nil {
return fmt.Errorf("error received during mount %s with UUID %s: %s", dev.DeviceName, dev.UUID, err), changed
}

// Max calculation: reserve min(5%, 10Gb)
usableSizeMb := dev.Size / 1024 / 1024
if usableSizeMb > 200*1024 {
usableSizeMb -= 10 * 1024
} else {
usableSizeMb = uint64(int(math.Floor(float64(usableSizeMb) * 0.95)))
}

// New volume is mounted, provision it into dynamicFolders
folderSpec := spec.FolderSpec{
Folder: targetMountPoint,
DiskType: "hdd",
BlockDevice: dev.Path,
UUID: dev.UUID,
Max: int(math.Floor(float64(usableSizeMb) / float64(volumeSizeLimitMB))),
}

*dynamicFolders = append(*dynamicFolders, folderSpec)
changed = true
}

return nil
return nil, changed
}
24 changes: 19 additions & 5 deletions pkg/cluster/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ package manager

import (
"fmt"
"github.com/seaweedfs/seaweed-up/pkg/cluster/spec"
"github.com/seaweedfs/seaweed-up/pkg/operator"
"sync"
)

type MDynamicConfig struct {
sync.RWMutex
Changed bool
DynamicVolumeServers map[string][]spec.FolderSpec
}

type Manager struct {
User string // username to login to the SSH server
IdentityFile string // path to the private key file
UsePassword bool // use password instead of identity file for ssh connection
ProxyUrl string // proxy URL for binary download
ComponentToDeploy string
Version string
Version string // seaweed version
EnvoyVersion string // envoy version
SshPort int
PrepareVolumeDisks bool
DynamicConfigFile string // filename of dynamic configuration file
DynamicConfig MDynamicConfig // dynamic configuration
ForceRestart bool

skipConfig bool
Expand All @@ -31,6 +42,9 @@ func NewManager() *Manager {
skipStart: false,
Version: "",
sudoPass: "",
DynamicConfig: MDynamicConfig{
DynamicVolumeServers: make(map[string][]spec.FolderSpec),
},
}
}

Expand All @@ -39,10 +53,10 @@ func info(message string) {
}

func (m *Manager) sudo(op operator.CommandOperator, cmd string) error {
info("[execute] " + cmd)
if m.sudoPass == "" {
return op.Execute(cmd)
}
info("[execute sudo] " + cmd)
//if m.sudoPass == "" {
// return op.Execute(cmd)
//}
defer fmt.Println()
return op.Execute(fmt.Sprintf("echo '%s' | sudo -S %s", m.sudoPass, cmd))
}
Loading