From a9d05e19d1c97509888e384fc2d796b3b904daad Mon Sep 17 00:00:00 2001 From: Vitaliy Ponomarev Date: Tue, 28 Mar 2023 18:09:44 +0300 Subject: [PATCH 1/2] Fixed issues: 1. Non root user with passwordless sudo didn't handled correctly 2. Minor issue in systemd service fixed 3. Added option `--dynamicFile ` for dynamic disk provisioning 4. Fixed automatic disk mount feature - disks are provisioned, info is saved in dynamic config. Redeploy will not lose disk information. --- cmd/deploy.go | 29 +++- pkg/cluster/manager/deploy_volume_server.go | 179 +++++++++++++------- pkg/cluster/manager/manager.go | 24 ++- pkg/cluster/manager/manager_deploy.go | 41 +++-- pkg/cluster/spec/spec.go | 13 +- pkg/cluster/spec/volume_server_spec.go | 8 +- pkg/disks/disks.go | 17 +- scripts/install.sh | 4 +- scripts/install_envoy.sh | 4 +- scripts/prepare_disk.sh | 7 +- 10 files changed, 232 insertions(+), 94 deletions(-) diff --git a/cmd/deploy.go b/cmd/deploy.go index bb9e579..9be91cd 100644 --- a/cmd/deploy.go +++ b/cmd/deploy.go @@ -28,10 +28,12 @@ func DeployCommand() *coral.Command { } var fileName string cmd.Flags().StringVarP(&fileName, "file", "f", "", "configuration file") + cmd.Flags().StringVarP(&m.DynamicConfigFile, "dynamicFile", "d", "", "dynamic configuration file") cmd.Flags().StringVarP(&m.User, "user", "u", utils.CurrentUser(), "The user name to login via SSH. The user must has root (or sudo) privilege.") cmd.Flags().IntVarP(&m.SshPort, "port", "p", 22, "The port to SSH.") cmd.Flags().StringVarP(&m.IdentityFile, "identity_file", "i", m.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.") cmd.Flags().StringVarP(&m.Version, "version", "v", "", "The SeaweedFS version") + cmd.Flags().StringVarP(&m.EnvoyVersion, "envoyVersion", "", "", "Envoy version") cmd.Flags().StringVarP(&m.ComponentToDeploy, "component", "c", "", "[master|volume|filer|envoy] only install one component") cmd.Flags().BoolVarP(&m.PrepareVolumeDisks, "mountDisks", "", true, "auto mount disks on volume server if unmounted") cmd.Flags().BoolVarP(&m.ForceRestart, "restart", "", false, "force to restart the service") @@ -48,16 +50,37 @@ func DeployCommand() *coral.Command { } fmt.Println(fileName) - spec := &spec.Specification{} + deploySpec := &spec.Specification{} data, readErr := os.ReadFile(fileName) if readErr != nil { return fmt.Errorf("read %s: %v", fileName, readErr) } - if unmarshalErr := yaml.Unmarshal(data, spec); unmarshalErr != nil { + if unmarshalErr := yaml.Unmarshal(data, deploySpec); unmarshalErr != nil { return fmt.Errorf("unmarshal %s: %v", fileName, unmarshalErr) } - return m.DeployCluster(spec) + // Check if DynamicConfig file name is specified and file exists + if len(m.DynamicConfigFile) > 0 { + if _, err := os.Stat(m.DynamicConfigFile); err == nil { + // Try to load dynamic config + data, readErr := os.ReadFile(m.DynamicConfigFile) + if readErr != nil { + return fmt.Errorf("read dynamic config %s: %v", m.DynamicConfigFile, readErr) + } + dList := make(map[string][]spec.FolderSpec) + if unmarshalErr := yaml.Unmarshal(data, dList); unmarshalErr != nil { + return fmt.Errorf("unmarshal dynamic config %s: %v", fileName, unmarshalErr) + } + m.DynamicConfig.DynamicVolumeServers = dList + } + } + + // dynamic config file name MUST be specified if prepare volume disks is active + if m.PrepareVolumeDisks && (len(m.DynamicConfigFile) < 1) { + return fmt.Errorf("`--dynamicFile ` should be specified when `--mountDisks` is set to true") + } + + return m.DeployCluster(deploySpec) } return cmd diff --git a/pkg/cluster/manager/deploy_volume_server.go b/pkg/cluster/manager/deploy_volume_server.go index 4c205e1..8f761ff 100644 --- a/pkg/cluster/manager/deploy_volume_server.go +++ b/pkg/cluster/manager/deploy_volume_server.go @@ -8,22 +8,54 @@ import ( "github.com/seaweedfs/seaweed-up/pkg/operator" "github.com/seaweedfs/seaweed-up/scripts" "strings" + "time" ) +func (m *Manager) UpdateDynamicVolumes(ip string, folders []spec.FolderSpec) { + m.DynamicConfig.Lock() + m.DynamicConfig.Changed = true + m.DynamicConfig.DynamicVolumeServers[ip] = folders + m.DynamicConfig.Unlock() +} + +func (m *Manager) GetDynamicVolumes(ip string) []spec.FolderSpec { + m.DynamicConfig.Lock() + defer m.DynamicConfig.Unlock() + + if m, ok := m.DynamicConfig.DynamicVolumeServers[ip]; ok { + return m + } + return []spec.FolderSpec{} +} + func (m *Manager) DeployVolumeServer(masters []string, volumeServerSpec *spec.VolumeServerSpec, index int) error { return operator.ExecuteRemote(fmt.Sprintf("%s:%d", volumeServerSpec.Ip, volumeServerSpec.PortSsh), m.User, m.IdentityFile, m.sudoPass, func(op operator.CommandOperator) error { component := "volume" componentInstance := fmt.Sprintf("%s%d", component, index) - var buf bytes.Buffer - volumeServerSpec.WriteToBuffer(masters, &buf) + // Prepare dynamic folders + dynamicFolders := m.GetDynamicVolumes(volumeServerSpec.Ip) if m.PrepareVolumeDisks { - if err := m.prepareUnmountedDisks(op); err != nil { + err, changed := m.prepareUnmountedDisks(op, &dynamicFolders) + if err != nil { return fmt.Errorf("prepare disks: %v", err) } + if changed { + // Pass change info into upper layer + m.UpdateDynamicVolumes(volumeServerSpec.Ip, dynamicFolders) + } } + // Update server specification for current server + for _, fld := range dynamicFolders { + flx := fld + volumeServerSpec.Folders = append(volumeServerSpec.Folders, &flx) + } + + var buf bytes.Buffer + volumeServerSpec.WriteToBuffer(masters, &buf) + return m.deployComponentInstance(op, component, componentInstance, &buf) }) @@ -59,95 +91,124 @@ func (m *Manager) StopVolumeServer(volumeServerSpec *spec.VolumeServerSpec, inde }) } -func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator) error { +func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator, dynamicFolders *[]spec.FolderSpec) (err error, changed bool) { println("prepareUnmountedDisks...") devices, mountpoints, err := disks.ListBlockDevices(op, []string{"/dev/sd", "/dev/nvme"}) if err != nil { - return fmt.Errorf("list device: %v", err) + return fmt.Errorf("list device: %v", err), false } fmt.Printf("mountpoints: %+v\n", mountpoints) - disks := make(map[string]*disks.BlockDevice) + diskList := make(map[string]*disks.BlockDevice) - // find all disks + // find all diskList for _, dev := range devices { if dev.Type == "disk" { - disks[dev.Path] = dev + diskList[dev.Path] = dev } } - fmt.Printf("disks0: %+v\n", disks) + fmt.Printf("All devices: %+v\n", diskList) - // remove disks already has partitions + // remove diskList already has partitions for _, dev := range devices { if dev.Type == "part" { - for parentPath, _ := range disks { + for parentPath, _ := range diskList { if strings.HasPrefix(dev.Path, parentPath) { // the disk is already partitioned - delete(disks, parentPath) + delete(diskList, parentPath) } } } } - fmt.Printf("disks1: %+v\n", disks) + fmt.Printf("Devices without partition: %+v\n", diskList) // remove already has mount point - for k, dev := range disks { + for k, dev := range diskList { if dev.MountPoint != "" { - delete(disks, k) + delete(diskList, k) + } else if dev.FilesystemType != "" { + delete(diskList, k) } } - fmt.Printf("disks2: %+v\n", disks) - - // format disk if no fstype - for _, dev := range disks { - if dev.FilesystemType == "" { - info("mkfs " + dev.Path) - if err := m.sudo(op, fmt.Sprintf("mkfs.ext4 %s", dev.Path)); err != nil { - return fmt.Errorf("create file system on %s: %v", dev.Path, err) - } + fmt.Printf("Devices without mountpoint and filesystem: %+v\n", diskList) + + // Process all unused RAW devices + for _, dev := range diskList { + // format disk + info("mkfs " + dev.Path) + if err := m.sudo(op, fmt.Sprintf("mkfs.ext4 %s", dev.Path)); err != nil { + return fmt.Errorf("create file system on %s: %v", dev.Path, err), changed } - } - // mount them - for _, dev := range disks { - if dev.MountPoint == "" { - var targetMountPoint = "" - for i := 1; i < 100; i++ { - t := fmt.Sprintf("/data%d", i) - if _, found := mountpoints[t]; found { - continue - } - targetMountPoint = t - mountpoints[t] = struct{}{} - break - } - if targetMountPoint == "" { - return fmt.Errorf("no good mount point") - } + // Wait 2 sec for kernel data sync + time.Sleep(2 * time.Second) - data := map[string]interface{}{ - "DevicePath": dev.Path, - "MountPoint": targetMountPoint, - } - prepareScript, err := scripts.RenderScript("prepare_disk.sh", data) - if err != nil { - return err - } - info("Installing mount_" + dev.DeviceName + ".sh") - err = op.Upload(prepareScript, fmt.Sprintf("/tmp/mount_%s.sh", dev.DeviceName), "0755") - if err != nil { - return fmt.Errorf("error received during upload mount script: %s", err) - } + // Get UUID + uuid, err := disks.GetDiskUUID(op, dev.Path) + if err != nil { + return fmt.Errorf("get disk UUID on %s: %v", dev.Path, err), changed + } - info("mount " + dev.DeviceName + "...") - err = op.Execute(fmt.Sprintf("cat /tmp/mount_%s.sh | SUDO_PASS=\"%s\" sh -\n", dev.DeviceName, m.sudoPass)) - if err != nil { - return fmt.Errorf("error received during mount: %s", err) + if uuid == "" { + return fmt.Errorf("get empty disk UUID for %s", dev.Path), changed + } + + diskList[dev.Path].UUID = uuid + dev.UUID = uuid + + fmt.Printf("* disk [%s] UUID: [%s]", dev.Path, dev.UUID) + + // Allocate spare mountpoint + var targetMountPoint = "" + for i := 1; i < 100; i++ { + t := fmt.Sprintf("/data%d", i) + if _, found := mountpoints[t]; found { + continue } + targetMountPoint = t + mountpoints[t] = struct{}{} + break + } + if targetMountPoint == "" { + return fmt.Errorf("no good mount point"), changed + } + + data := map[string]interface{}{ + "DevicePath": dev.Path, + "DeviceUUID": dev.UUID, + "MountPoint": targetMountPoint, + } + prepareScript, err := scripts.RenderScript("prepare_disk.sh", data) + if err != nil { + return err, changed + } + info("Installing mount_" + dev.DeviceName + ".sh") + err = op.Upload(prepareScript, fmt.Sprintf("/tmp/mount_%s.sh", dev.DeviceName), "0755") + if err != nil { + return fmt.Errorf("error received during upload mount script: %s", err), changed + } + + info("mount " + dev.DeviceName + " with UUID " + dev.UUID + " into " + targetMountPoint + "...") + err = op.Execute(fmt.Sprintf("cat /tmp/mount_%s.sh | SUDO_PASS=\"%s\" sh -\n", dev.DeviceName, m.sudoPass)) + if err != nil { + return fmt.Errorf("error received during mount %s with UUID %s: %s", dev.DeviceName, dev.UUID, err), changed + } + // New volume is mounted, provision it into dynamicFolders + folderSpec := spec.FolderSpec{ + Folder: targetMountPoint, + DiskType: "hdd", + BlockDevice: dev.Path, + UUID: dev.UUID, } + + // TODO: calculate Max + folderSpec.Max = 15 + + *dynamicFolders = append(*dynamicFolders, folderSpec) + changed = true } - return nil + return nil, changed } diff --git a/pkg/cluster/manager/manager.go b/pkg/cluster/manager/manager.go index f654418..a9c76c8 100644 --- a/pkg/cluster/manager/manager.go +++ b/pkg/cluster/manager/manager.go @@ -2,18 +2,29 @@ package manager import ( "fmt" + "github.com/seaweedfs/seaweed-up/pkg/cluster/spec" "github.com/seaweedfs/seaweed-up/pkg/operator" + "sync" ) +type MDynamicConfig struct { + sync.RWMutex + Changed bool + DynamicVolumeServers map[string][]spec.FolderSpec +} + type Manager struct { User string // username to login to the SSH server IdentityFile string // path to the private key file UsePassword bool // use password instead of identity file for ssh connection ProxyUrl string // proxy URL for binary download ComponentToDeploy string - Version string + Version string // seaweed version + EnvoyVersion string // envoy version SshPort int PrepareVolumeDisks bool + DynamicConfigFile string // filename of dynamic configuration file + DynamicConfig MDynamicConfig // dynamic configuration ForceRestart bool skipConfig bool @@ -31,6 +42,9 @@ func NewManager() *Manager { skipStart: false, Version: "", sudoPass: "", + DynamicConfig: MDynamicConfig{ + DynamicVolumeServers: make(map[string][]spec.FolderSpec), + }, } } @@ -39,10 +53,10 @@ func info(message string) { } func (m *Manager) sudo(op operator.CommandOperator, cmd string) error { - info("[execute] " + cmd) - if m.sudoPass == "" { - return op.Execute(cmd) - } + info("[execute sudo] " + cmd) + //if m.sudoPass == "" { + // return op.Execute(cmd) + //} defer fmt.Println() return op.Execute(fmt.Sprintf("echo '%s' | sudo -S %s", m.sudoPass, cmd)) } diff --git a/pkg/cluster/manager/manager_deploy.go b/pkg/cluster/manager/manager_deploy.go index c91c2d2..b03b79f 100644 --- a/pkg/cluster/manager/manager_deploy.go +++ b/pkg/cluster/manager/manager_deploy.go @@ -11,6 +11,8 @@ import ( "github.com/seaweedfs/seaweed-up/pkg/utils" "github.com/seaweedfs/seaweed-up/scripts" "github.com/thanhpk/randstr" + "gopkg.in/yaml.v3" + "io/ioutil" "sync" ) @@ -40,14 +42,30 @@ func (m *Manager) DeployCluster(specification *spec.Specification) error { if m.shouldInstall("volume") { for index, volumeSpec := range specification.VolumeServers { - wg.Add(1) - go func(index int, volumeSpec *spec.VolumeServerSpec) { - defer wg.Done() - if err := m.DeployVolumeServer(masters, volumeSpec, index); err != nil { - deployErrors = append(deployErrors, fmt.Errorf("deploy to volume server %s:%d :%v", volumeSpec.Ip, volumeSpec.PortSsh, err)) + // wg.Add(1) + // go func(index int, volumeSpec *spec.VolumeServerSpec) { + // defer wg.Done() + if err := m.DeployVolumeServer(masters, volumeSpec, index); err != nil { + deployErrors = append(deployErrors, fmt.Errorf("deploy to volume server %s:%d :%v", volumeSpec.Ip, volumeSpec.PortSsh, err)) + } + // }(index, volumeSpec) + } + + // Update dynamic volume server specification + m.DynamicConfig.Lock() + if m.DynamicConfig.Changed { + // TODO: Save changes to file + b, err := yaml.Marshal(m.DynamicConfig.DynamicVolumeServers) + if err != nil { + deployErrors = append(deployErrors, fmt.Errorf("error yaml marshal for Dynamic Volume Servers: %v", err)) + } else { + err = ioutil.WriteFile(m.DynamicConfigFile, b, 0666) + if err != nil { + deployErrors = append(deployErrors, fmt.Errorf("error writing to %s with Dynamic Volume Servers: %v", m.DynamicConfigFile, err)) } - }(index, volumeSpec) + } } + m.DynamicConfig.Unlock() } if m.shouldInstall("filer") { for index, filerSpec := range specification.FilerServers { @@ -66,12 +84,15 @@ func (m *Manager) DeployCluster(specification *spec.Specification) error { } if m.shouldInstall("envoy") { - latest, err := config.GitHubLatestRelease(context.Background(), "0", "envoyproxy", "envoy") - if err != nil { - return errors.Wrapf(err, "unable to get latest version number, define a version manually with the --version flag") + if m.EnvoyVersion == "" { + latest, err := config.GitHubLatestRelease(context.Background(), "0", "envoyproxy", "envoy") + if err != nil { + return errors.Wrapf(err, "unable to get latest version number, define a version manually with the --version flag") + } + m.EnvoyVersion = latest.Version } for index, envoySpec := range specification.EnvoyServers { - envoySpec.Version = utils.Nvl(envoySpec.Version, latest.Version) + envoySpec.Version = utils.Nvl(envoySpec.Version, m.EnvoyVersion) if err := m.DeployEnvoyServer(specification.FilerServers, envoySpec, index); err != nil { return fmt.Errorf("deploy to envoy server %s:%d :%v", envoySpec.Ip, envoySpec.PortSsh, err) } diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 924e4d6..956c423 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -19,11 +19,12 @@ type ( } Specification struct { - GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` - ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` - MasterServers []*MasterServerSpec `yaml:"master_servers"` - VolumeServers []*VolumeServerSpec `yaml:"volume_servers"` - FilerServers []*FilerServerSpec `yaml:"filer_servers"` - EnvoyServers []*EnvoyServerSpec `yaml:"envoy_servers"` + GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` + ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` + MasterServers []*MasterServerSpec `yaml:"master_servers"` + VolumeServers []*VolumeServerSpec `yaml:"volume_servers"` + FilerServers []*FilerServerSpec `yaml:"filer_servers"` + EnvoyServers []*EnvoyServerSpec `yaml:"envoy_servers"` + DynamicVolumeServers []*VolumeServerSpec `yaml:"dynamic_volume_servers,omitempty"` } ) diff --git a/pkg/cluster/spec/volume_server_spec.go b/pkg/cluster/spec/volume_server_spec.go index 75536a0..0a158a0 100644 --- a/pkg/cluster/spec/volume_server_spec.go +++ b/pkg/cluster/spec/volume_server_spec.go @@ -24,9 +24,11 @@ type VolumeServerSpec struct { OS string `yaml:"os,omitempty"` } type FolderSpec struct { - Folder string `yaml:"folder"` - DiskType string `yaml:"disk" default:"hdd"` - Max int `yaml:"max,omitempty"` + Folder string `yaml:"folder"` + DiskType string `yaml:"disk" default:"hdd"` + Max int `yaml:"max,omitempty"` + BlockDevice string `yaml:"blockDevice,omitempty"` + UUID string `yaml:"uuid,omitempty"` } func (vs *VolumeServerSpec) WriteToBuffer(masters []string, buf *bytes.Buffer) { diff --git a/pkg/disks/disks.go b/pkg/disks/disks.go index c40a8f0..4feeee3 100644 --- a/pkg/disks/disks.go +++ b/pkg/disks/disks.go @@ -3,6 +3,7 @@ package disks import ( "bufio" "bytes" + "fmt" "github.com/seaweedfs/seaweed-up/pkg/operator" "regexp" "strconv" @@ -22,6 +23,20 @@ type BlockDevice struct { Type string } +func GetDiskUUID(op operator.CommandOperator, devName string) (UUID string, err error) { + devices, _, err := ListBlockDevices(op, []string{devName}) + + if err != nil { + return "", err + } + for _, dev := range devices { + if dev.Path == devName { + return dev.UUID, nil + } + } + return "", fmt.Errorf("uuid not found") +} + func ListBlockDevices(op operator.CommandOperator, prefixes []string) (output []*BlockDevice, mountpoints map[string]struct{}, err error) { mountpoints = make(map[string]struct{}) out, err := op.Output( @@ -65,7 +80,7 @@ func ListBlockDevices(op operator.CommandOperator, prefixes []string) (output [] for _, prefix := range prefixes { if strings.HasPrefix(dev.Path, prefix) { hasValidPrefix = true - println("valid path", dev.Path) + //println("valid path", dev.Path) break } } diff --git a/scripts/install.sh b/scripts/install.sh index 09c8a7c..198bfa8 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -151,6 +151,8 @@ Description=Seaweed${COMPONENT_INSTANCE} Documentation=https://github.com/seaweedfs/seaweedfs/wiki Wants=network-online.target After=network-online.target +StartLimitBurst=3 +StartLimitIntervalSec=10 [Service] WorkingDirectory=${SEAWEED_COMPONENT_INSTANCE_DATA_DIR} @@ -162,8 +164,6 @@ LimitNOFILE=infinity LimitNPROC=infinity Restart=on-failure RestartSec=2 -StartLimitBurst=3 -StartLimitIntervalSec=10 TasksMax=infinity [Install] diff --git a/scripts/install_envoy.sh b/scripts/install_envoy.sh index b9823c0..d20b496 100644 --- a/scripts/install_envoy.sh +++ b/scripts/install_envoy.sh @@ -144,6 +144,8 @@ Description=Seaweed${COMPONENT_INSTANCE} Documentation=https://github.com/seaweedfs/seaweedfs/wiki Wants=network-online.target After=network-online.target +StartLimitBurst=3 +StartLimitIntervalSec=10 [Service] WorkingDirectory=${SEAWEED_COMPONENT_INSTANCE_DATA_DIR} @@ -155,8 +157,6 @@ LimitNOFILE=infinity LimitNPROC=infinity Restart=on-failure RestartSec=2 -StartLimitBurst=3 -StartLimitIntervalSec=10 TasksMax=infinity [Install] diff --git a/scripts/prepare_disk.sh b/scripts/prepare_disk.sh index 25dee89..39f5877 100644 --- a/scripts/prepare_disk.sh +++ b/scripts/prepare_disk.sh @@ -27,6 +27,7 @@ setup_env() { MOUNT_POINT={{.MountPoint}} DEVICE_PATH={{.DevicePath}} + DEVICE_UUID={{.DeviceUUID}} } setup_mount() { @@ -34,9 +35,9 @@ setup_mount() { info "Setup Mount Point" $SUDO mkdir -p -m 755 ${MOUNT_POINT} info "add ${DEVICE_PATH} ${MOUNT_POINT} to fstab" - echo "${DEVICE_PATH} ${MOUNT_POINT} ext4 noatime 0 2" | $SUDO tee -a /etc/fstab - info "mount ${DEVICE_PATH} ${MOUNT_POINT}" - $SUDO mount ${DEVICE_PATH} ${MOUNT_POINT} + echo "UUID=\"${DEVICE_UUID}\" ${MOUNT_POINT} ext4 noatime 0 2" | $SUDO tee -a /etc/fstab + info "mount ${DEVICE_PATH} with UUID=${DEVICE_UUID} at ${MOUNT_POINT}" + $SUDO mount ${MOUNT_POINT} return 0 } From 324bdb970304769546461624a412bd2919a47467 Mon Sep 17 00:00:00 2001 From: Vitaliy Ponomarev Date: Wed, 29 Mar 2023 01:57:55 +0300 Subject: [PATCH 2/2] Calculate correct value for `Max` --- pkg/cluster/manager/deploy_volume_server.go | 19 +++++++++++++------ pkg/cluster/manager/manager_deploy.go | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/manager/deploy_volume_server.go b/pkg/cluster/manager/deploy_volume_server.go index 8f761ff..62f0dc7 100644 --- a/pkg/cluster/manager/deploy_volume_server.go +++ b/pkg/cluster/manager/deploy_volume_server.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweed-up/pkg/disks" "github.com/seaweedfs/seaweed-up/pkg/operator" "github.com/seaweedfs/seaweed-up/scripts" + "math" "strings" "time" ) @@ -28,7 +29,7 @@ func (m *Manager) GetDynamicVolumes(ip string) []spec.FolderSpec { return []spec.FolderSpec{} } -func (m *Manager) DeployVolumeServer(masters []string, volumeServerSpec *spec.VolumeServerSpec, index int) error { +func (m *Manager) DeployVolumeServer(masters []string, globalOptions spec.GlobalOptions, volumeServerSpec *spec.VolumeServerSpec, index int) error { return operator.ExecuteRemote(fmt.Sprintf("%s:%d", volumeServerSpec.Ip, volumeServerSpec.PortSsh), m.User, m.IdentityFile, m.sudoPass, func(op operator.CommandOperator) error { component := "volume" @@ -37,7 +38,7 @@ func (m *Manager) DeployVolumeServer(masters []string, volumeServerSpec *spec.Vo // Prepare dynamic folders dynamicFolders := m.GetDynamicVolumes(volumeServerSpec.Ip) if m.PrepareVolumeDisks { - err, changed := m.prepareUnmountedDisks(op, &dynamicFolders) + err, changed := m.prepareUnmountedDisks(op, &dynamicFolders, globalOptions.VolumeSizeLimitMB) if err != nil { return fmt.Errorf("prepare disks: %v", err) } @@ -91,7 +92,7 @@ func (m *Manager) StopVolumeServer(volumeServerSpec *spec.VolumeServerSpec, inde }) } -func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator, dynamicFolders *[]spec.FolderSpec) (err error, changed bool) { +func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator, dynamicFolders *[]spec.FolderSpec, volumeSizeLimitMB int) (err error, changed bool) { println("prepareUnmountedDisks...") devices, mountpoints, err := disks.ListBlockDevices(op, []string{"/dev/sd", "/dev/nvme"}) if err != nil { @@ -195,17 +196,23 @@ func (m *Manager) prepareUnmountedDisks(op operator.CommandOperator, dynamicFold return fmt.Errorf("error received during mount %s with UUID %s: %s", dev.DeviceName, dev.UUID, err), changed } + // Max calculation: reserve min(5%, 10Gb) + usableSizeMb := dev.Size / 1024 / 1024 + if usableSizeMb > 200*1024 { + usableSizeMb -= 10 * 1024 + } else { + usableSizeMb = uint64(int(math.Floor(float64(usableSizeMb) * 0.95))) + } + // New volume is mounted, provision it into dynamicFolders folderSpec := spec.FolderSpec{ Folder: targetMountPoint, DiskType: "hdd", BlockDevice: dev.Path, UUID: dev.UUID, + Max: int(math.Floor(float64(usableSizeMb) / float64(volumeSizeLimitMB))), } - // TODO: calculate Max - folderSpec.Max = 15 - *dynamicFolders = append(*dynamicFolders, folderSpec) changed = true } diff --git a/pkg/cluster/manager/manager_deploy.go b/pkg/cluster/manager/manager_deploy.go index b03b79f..42ea191 100644 --- a/pkg/cluster/manager/manager_deploy.go +++ b/pkg/cluster/manager/manager_deploy.go @@ -45,7 +45,7 @@ func (m *Manager) DeployCluster(specification *spec.Specification) error { // wg.Add(1) // go func(index int, volumeSpec *spec.VolumeServerSpec) { // defer wg.Done() - if err := m.DeployVolumeServer(masters, volumeSpec, index); err != nil { + if err := m.DeployVolumeServer(masters, specification.GlobalOptions, volumeSpec, index); err != nil { deployErrors = append(deployErrors, fmt.Errorf("deploy to volume server %s:%d :%v", volumeSpec.Ip, volumeSpec.PortSsh, err)) } // }(index, volumeSpec)