Skip to content

Commit

Permalink
Snapshot Support
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen committed May 26, 2020
1 parent c68b24e commit d0c4ab0
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 34 deletions.
18 changes: 18 additions & 0 deletions docs/contributing/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
- Login to `gcloud` (`gcloud auth login`)
* Google Cloud Project
- Create Test Project (`gcloud projects create [PROJECT_ID] --name=[PROJECT_NAME]`)
* Enable Storage Transfer API
- Visit https://console.developers.google.com/apis/library/storagetransfer.googleapis.com
- Enable API
- Setup Access (https://cloud.google.com/storage-transfer/docs/configure-access)
- `gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:project-[PROJECT_NUMBER]@storage-transfer-service.iam.gserviceaccount.com --role=roles/storage.admin`
* Google Cloud Service Account
- Create (`gcloud iam service-accounts create [ACCOUNT_NAME] --display-name="Test Account" --description="Test Account for GCS CSI" --project=[PROJECT_ID]`)
- Create Key (`gcloud iam service-accounts keys create service-account.json --iam-account=[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --project=[PROJECT_ID]`)
- Give Storage Admin Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storage.admin`)
- Give Transfer User Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storagetransfer.user`)
* Create Secret
- `kubectl create secret generic csi-gcs-secret --from-file=key=service-account.json`
* Pull Needed Images
Expand Down Expand Up @@ -94,6 +100,18 @@ ControllerValidateVolumeCapabilitiesSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
ControllerExpandVolumeSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
CreateSnapshotSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
DeleteSnapshotSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
```
## Develop inside Docker
Expand Down
1 change: 1 addition & 0 deletions examples/dynamic/sc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ parameters:
csi.storage.k8s.io/provisioner-secret-namespace: default
csi.storage.k8s.io/controller-expand-secret-name: csi-gcs-secret-creator
csi.storage.k8s.io/controller-expand-secret-namespace: default
gcs.csi.ofek.dev/project-id: joshmartin-csi-gcs-test
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
cloud.google.com/go v0.38.0
github.com/container-storage-interface/spec v1.2.0
github.com/golang/protobuf v1.3.2
github.com/kubernetes-csi/csi-lib-utils v0.7.0
github.com/kubernetes-csi/csi-test v2.2.0+incompatible // indirect
github.com/kubernetes-csi/csi-test/v3 v3.1.0
Expand Down
250 changes: 216 additions & 34 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package driver
import (
"context"
"fmt"
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/ofek/csi-gcs/pkg/flags"
"github.com/ofek/csi-gcs/pkg/util"
Expand Down Expand Up @@ -77,6 +79,8 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
}
defer util.CleanupKey(keyFile, KeyStoragePath)

newCapacity := int64(req.GetCapacityRange().GetRequiredBytes())

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
Expand All @@ -86,43 +90,84 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
// Creates a Bucket instance.
bucket := client.Bucket(options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, snapshot can't be restored: %s", options[flags.FLAG_BUCKET])
}

// Check if Bucket Exists
_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET])
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET])
if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
}

// Check / Set Capacity
if existingCapacity == 0 {
_, err = util.SetBucketCapacity(ctx, bucket, newCapacity)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err)
}
} else if existingCapacity < newCapacity {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET]))
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION]}); err != nil {
if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{
"capacity": strconv.FormatInt(newCapacity, 10),
"type": "volume",
}}); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err)
}
}

// Get Capacity
bucketAttrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err)
}
if snapshot := req.GetVolumeContentSource().GetSnapshot(); snapshot != nil {
// Creates a Bucket instance.
snapshotBucket := client.Bucket(snapshot.SnapshotId)

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
}
// Check if Bucket Exists
snapshotBucketAttrs, err := snapshotBucket.Attrs(ctx)
if err == nil {

// Check / Set Capacity
newCapacity := int64(req.GetCapacityRange().GetRequiredBytes())
if existingCapacity == 0 {
_, err = util.SetBucketCapacity(ctx, bucket, newCapacity)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err)
if !util.BucketIs(snapshotBucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

err = util.CopyBucketContent(ctx, keyFile, projectId, snapshot.SnapshotId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}
} else {
return nil, status.Errorf(codes.NotFound, "Snapshot does not exist")
}
}
if cloneVolume := req.GetVolumeContentSource().GetVolume(); cloneVolume != nil {
// Creates a Bucket instance.
cloneVolumeBucket := client.Bucket(cloneVolume.VolumeId)

// Check if Bucket Exists
cloneVolumeBucketAttrs, err := cloneVolumeBucket.Attrs(ctx)
if err == nil {

if !util.BucketIs(cloneVolumeBucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

err = util.CopyBucketContent(ctx, keyFile, projectId, cloneVolume.VolumeId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}
} else {
return nil, status.Errorf(codes.NotFound, "Volume does not exist")
}
} else if existingCapacity < newCapacity {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET]))
}

return &csi.CreateVolumeResponse{
Expand Down Expand Up @@ -156,8 +201,12 @@ func (d *GCSDriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque
// Creates a Bucket instance.
bucket := client.Bucket(req.VolumeId)

_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

if err := bucket.Delete(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.VolumeId, err)
}
Expand Down Expand Up @@ -187,6 +236,20 @@ func (d *GCSDriver) ControllerGetCapabilities(ctx context.Context, req *csi.Cont
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
},
}, nil
}
Expand Down Expand Up @@ -218,12 +281,16 @@ func (d *GCSDriver) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
// Creates a Bucket instance.
bucket := client.Bucket(bucketName)

_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)

if err != nil {
return nil, status.Error(codes.NotFound, "volume does not exist")
}

if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

for _, capability := range req.GetVolumeCapabilities() {
if capability.GetMount() != nil && capability.GetBlock() == nil {
continue
Expand Down Expand Up @@ -268,13 +335,129 @@ func (d *GCSDriver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
func (d *GCSDriver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
klog.V(4).Infof("Method CreateSnapshot called with: %s", protosanitizer.StripSecrets(req))

return nil, status.Error(codes.Unimplemented, "")
if req.SourceVolumeId == "" {
return nil, status.Errorf(codes.InvalidArgument, "SourceVolumeId cannot be empty")
}
if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Name field cannot be empty")
}

// Default Options
var options = map[string]string{
"bucket": util.BucketName(req.Name),
"location": "US",
}

// Merge Secret Options
options = flags.MergeSecret(options, req.Secrets)

// Merge Context
if req.Parameters != nil {
options = flags.MergeAnnotations(options, req.Parameters)
}

// Retrieve Key Secret
keyFile, err := util.GetKey(req.Secrets, KeyStoragePath)
if err != nil {
return nil, err
}
defer util.CleanupKey(keyFile, KeyStoragePath)

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err)
}

// Creates a Bucket instance.
bucket := client.Bucket(options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET])
}

// Check if Bucket Exists
bucketAttrs, err := bucket.Attrs(ctx)
if bucketAttrs != nil && err == nil {
klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET])

if !util.BucketIs(bucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

for labelName, labelValue := range bucketAttrs.Labels {
if labelName == "volume" && labelValue != req.SourceVolumeId {
return nil, status.Errorf(codes.AlreadyExists, "Bucket is for another volume")
}
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{
"volume": req.SourceVolumeId,
"type": "snapshot",
}}); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err)
}
}

creationTime := ptypes.TimestampNow()

err = util.CopyBucketContent(ctx, keyFile, projectId, req.SourceVolumeId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: options[flags.FLAG_BUCKET],
SourceVolumeId: req.SourceVolumeId,
CreationTime: creationTime,
ReadyToUse: true,
},
}, nil
}

func (d *GCSDriver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
klog.V(4).Infof("Method DeleteSnapshot called with: %s", protosanitizer.StripSecrets(req))

return nil, status.Error(codes.Unimplemented, "")
if req.SnapshotId == "" {
return nil, status.Errorf(codes.InvalidArgument, "SnapshotId is invalid (empty string)")
}

// Retrieve Key Secret
keyFile, err := util.GetKey(req.Secrets, KeyStoragePath)
if err != nil {
return nil, err
}
defer util.CleanupKey(keyFile, KeyStoragePath)

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err)
}

// Creates a Bucket instance.
bucket := client.Bucket(req.SnapshotId)

// Check if Bucket Exists
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {

if !util.BucketIs(bucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

if err := bucket.Delete(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.SnapshotId, err)
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, not deleting", req.SnapshotId)
}

return &csi.DeleteSnapshotResponse{}, nil
}

func (d *GCSDriver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
Expand Down Expand Up @@ -307,19 +490,18 @@ func (d *GCSDriver) ControllerExpandVolume(ctx context.Context, req *csi.Control
bucket := client.Bucket(req.VolumeId)

// Check if Bucket Exists
_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
klog.V(2).Infof("Bucket '%s' exists", req.VolumeId)

if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}
} else {
return nil, status.Errorf(codes.NotFound, "Bucket '%s' does not exist", req.VolumeId)
}

// Get Capacity
bucketAttrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err)
}

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
Expand Down
Loading

0 comments on commit d0c4ab0

Please sign in to comment.