diff options
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/driver/controllerserver.go | 5 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 21 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 11 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 2 | ||||
| -rw-r--r-- | pkg/k8s/client.go | 42 |
5 files changed, 53 insertions, 28 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index 0d8b671..1bd2169 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "fmt" "io" - "strconv" "strings" "github.com/container-storage-interface/spec/lib/go/csi" @@ -48,10 +47,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } glog.V(4).Infof("params:%v", params) capacity := req.GetCapacityRange().GetRequiredBytes() - if capacity > 0 { - glog.V(4).Infof("volume capacity: %d", capacity) - params["volumeCapacity"] = strconv.FormatInt(capacity, 10) - } if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil { return nil, fmt.Errorf("error setting bucket metadata: %v", err) diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index 8f4ab9e..56ba98b 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "strings" "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" @@ -69,12 +68,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { args = append(args, "-readOnly") } - // Handle volumeCapacity from controllerserver.go:51 - if value, ok := seaweedFs.volContext["volumeCapacity"]; ok { - capacityMB := parseVolumeCapacity(value) - args = append(args, fmt.Sprintf("-collectionQuotaMB=%d", capacityMB)) - } - // Values for override-able args // Whitelist for merging with volContext argsMap := map[string]string{ @@ -126,7 +119,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { // Explicitly ignored volContext args e.g. handled somewhere else ignoreArgs := []string{ - "volumeCapacity", "dataLocality", } @@ -186,19 +178,6 @@ func GetLocalSocket(volumeID string) string { return socket } -func parseVolumeCapacity(volumeCapacity string) int64 { - var capacity int64 - - if vCap, err := strconv.ParseInt(volumeCapacity, 10, 64); err != nil { - glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err) - } else { - capacity = vCap - } - - capacityMB := capacity / 1024 / 1024 - return capacityMB -} - func in_arr(arr []string, val string) bool { for _, v := range arr { if val == v { diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 130c2a5..616c79b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/k8s" "github.com/seaweedfs/seaweedfs/weed/glog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -79,6 +80,14 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } return nil, status.Error(codes.Internal, err.Error()) } + //k8s api get Capacity + if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil { + if err := volume.Quota(capacity); err != nil { + return nil, err + } + } else { + return nil, err + } ns.volumes.Store(volumeID, volume) glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) @@ -259,7 +268,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV defer volumeMutex.Unlock() if volume, ok := ns.volumes.Load(volumeID); ok { - if err := volume.(*Volume).Expand(requiredBytes); err != nil { + if err := volume.(*Volume).Quota(requiredBytes); err != nil { return nil, err } } diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index a6d7ddb..ac0a80a 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -80,7 +80,7 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly return nil } -func (vol *Volume) Expand(sizeByte int64) error { +func (vol *Volume) Quota(sizeByte int64) error { target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go new file mode 100644 index 0000000..8861d0a --- /dev/null +++ b/pkg/k8s/client.go @@ -0,0 +1,42 @@ +package k8s + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func NewInCluster() (*kubernetes.Clientset, error) { + //creates the in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + panic(err.Error()) + } + + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + return clientset, nil +} + +func GetVolumeCapacity(volumeId string) (int64, error) { + client, err := NewInCluster() + if err != nil { + return 0, err + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if volume, err := client.CoreV1().PersistentVolumes().Get(ctx, volumeId, metav1.GetOptions{}); err != nil { + return 0, err + } else { + storage := volume.Spec.Capacity.Storage() + capacity, _ := storage.AsInt64() + return capacity, nil + } +} |
