aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/driver.go33
-rw-r--r--pkg/driver/mounter.go2
-rw-r--r--pkg/driver/volume.go2
-rw-r--r--pkg/mountmanager/client.go5
-rw-r--r--pkg/mountmanager/manager.go2
-rw-r--r--pkg/mountmanager/socket.go12
6 files changed, 40 insertions, 16 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index d162369..6bdcc3b 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -4,10 +4,12 @@ import (
"fmt"
"os"
"os/signal"
+ "path/filepath"
"syscall"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -27,8 +29,9 @@ type SeaweedFsDriver struct {
nodeID string
version string
- endpoint string
- mountEndpoint string
+ endpoint string
+ mountEndpoint string
+ volumeSocketDir string // directory for volume sockets, derived from mountEndpoint
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
@@ -55,15 +58,25 @@ func NewSeaweedFsDriver(name, filer, nodeID, endpoint, mountEndpoint string, ena
util.LoadConfiguration("security", false)
+ // Derive volumeSocketDir from mountEndpoint
+ volumeSocketDir := mountmanager.DefaultSocketDir
+ if mountEndpoint != "" {
+ _, address, err := mountmanager.ParseEndpoint(mountEndpoint)
+ if err == nil && address != "" {
+ volumeSocketDir = filepath.Dir(address)
+ }
+ }
+
n := &SeaweedFsDriver{
- endpoint: endpoint,
- mountEndpoint: mountEndpoint,
- nodeID: nodeID,
- name: name,
- version: version,
- filers: pb.ServerAddresses(filer).ToAddresses(),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
- signature: util.RandomInt32(),
+ endpoint: endpoint,
+ mountEndpoint: mountEndpoint,
+ volumeSocketDir: volumeSocketDir,
+ nodeID: nodeID,
+ name: name,
+ version: version,
+ filers: pb.ServerAddresses(filer).ToAddresses(),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ signature: util.RandomInt32(),
}
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index d86d82e..b96f4b8 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -68,7 +68,7 @@ func (m *mountServiceMounter) Mount(target string) (Unmounter, error) {
cacheBase = os.TempDir()
}
cacheDir := filepath.Join(cacheBase, m.volumeID)
- localSocket := mountmanager.LocalSocketPath(m.volumeID)
+ localSocket := mountmanager.LocalSocketPath(m.driver.volumeSocketDir, m.volumeID)
args, err := m.buildMountArgs(target, cacheDir, localSocket, filers)
if err != nil {
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 9e258a8..18d5c2a 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -29,7 +29,7 @@ func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volum
return &Volume{
VolumeId: volumeID,
mounter: mounter,
- localSocket: mountmanager.LocalSocketPath(volumeID),
+ localSocket: mountmanager.LocalSocketPath(driver.volumeSocketDir, volumeID),
driver: driver,
}
}
diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go
index 6288c44..7cbfe63 100644
--- a/pkg/mountmanager/client.go
+++ b/pkg/mountmanager/client.go
@@ -85,7 +85,10 @@ func (c *Client) doPost(path string, payload any, out any) error {
if err := json.NewDecoder(resp.Body).Decode(&errResp); err == nil && errResp.Error != "" {
return errors.New(errResp.Error)
}
- data, _ := io.ReadAll(resp.Body)
+ data, readErr := io.ReadAll(resp.Body)
+ if readErr != nil {
+ return fmt.Errorf("mount service error: %s (failed to read body: %v)", resp.Status, readErr)
+ }
return fmt.Errorf("mount service error: %s (%s)", resp.Status, string(data))
}
diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go
index 0fa3e0a..7e94b89 100644
--- a/pkg/mountmanager/manager.go
+++ b/pkg/mountmanager/manager.go
@@ -98,7 +98,7 @@ func (m *Manager) Unmount(req *UnmountRequest) (*UnmountResponse, error) {
if ok, err := kubeMounter.IsMountPoint(entry.targetPath); ok || mount.IsCorruptedMnt(err) {
if err = kubeMounter.Unmount(entry.targetPath); err != nil {
- return &UnmountResponse{}, err
+ return nil, err
}
}
diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go
index 1b8a079..f327dcb 100644
--- a/pkg/mountmanager/socket.go
+++ b/pkg/mountmanager/socket.go
@@ -2,15 +2,23 @@ package mountmanager
import (
"fmt"
+ "path/filepath"
"github.com/seaweedfs/seaweedfs/weed/util"
)
+// DefaultSocketDir is the default directory for volume sockets.
+const DefaultSocketDir = "/var/lib/seaweedfs-mount"
+
// LocalSocketPath returns the unix socket path used to communicate with the weed mount process.
-func LocalSocketPath(volumeID string) string {
+// The baseDir parameter should be the directory where sockets are stored (e.g., derived from mountEndpoint).
+func LocalSocketPath(baseDir, volumeID string) string {
+ if baseDir == "" {
+ baseDir = DefaultSocketDir
+ }
hash := util.HashToInt32([]byte(volumeID))
if hash < 0 {
hash = -hash
}
- return fmt.Sprintf("/var/lib/seaweedfs-mount/seaweedfs-mount-%d.sock", hash)
+ return filepath.Join(baseDir, fmt.Sprintf("seaweedfs-mount-%d.sock", hash))
}