aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/filer.go38
-rw-r--r--weed/filer2/filer_master.go60
2 files changed, 27 insertions, 71 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 2deb8ffd5..1f2697cda 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -1,30 +1,30 @@
package filer2
import (
+ "context"
"fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/karlseguin/ccache"
- "os"
- "path/filepath"
- "strings"
- "time"
)
type Filer struct {
- masters []string
store FilerStore
directoryCache *ccache.Cache
-
- currentMaster string
+ MasterClient *wdclient.MasterClient
}
func NewFiler(masters []string) *Filer {
return &Filer{
- masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
}
}
@@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() {
f.directoryCache = nil
}
+func (fs *Filer) GetMaster() string {
+ return fs.MasterClient.GetMaster()
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+ fs.MasterClient.KeepConnectedToMaster()
+}
+
func (f *Filer) CreateEntry(entry *Entry) error {
dirParts := strings.Split(string(entry.FullPath), "/")
@@ -198,9 +206,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
- glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
- }
+ f.DeleteFileByFileId(chunk.FileId)
+ }
+}
+
+func (f *Filer) DeleteFileByFileId(fileId string) {
+ fileUrlOnVolume, err := f.MasterClient.LookupFileId(fileId)
+ if err != nil {
+ glog.V(0).Infof("can not find file %s: %v", fileId, err)
+ }
+ if err := operation.DeleteFromVolumeServer(fileUrlOnVolume, ""); err != nil {
+ glog.V(0).Infof("deleting file %s: %v", fileId, err)
}
}
diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go
deleted file mode 100644
index 63c3ef452..000000000
--- a/weed/filer2/filer_master.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (fs *Filer) GetMaster() string {
- return fs.currentMaster
-}
-
-func (fs *Filer) KeepConnectedToMaster() {
- glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
- for _, master := range fs.masters {
- glog.V(0).Infof("Connecting to %v", master)
- withMasterClient(master, func(client master_pb.SeaweedClient) error {
- stream, err := client.KeepConnected(context.Background())
- if err != nil {
- glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
- return err
- }
-
- glog.V(0).Infof("Connected to %v", master)
- fs.currentMaster = master
-
- for {
- time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
-
- if err = stream.Send(&master_pb.Empty{}); err != nil {
- glog.V(0).Infof("failed to send to %s: %v", master, err)
- return err
- }
-
- if _, err = stream.Recv(); err != nil {
- glog.V(0).Infof("failed to receive from %s: %v", master, err)
- return err
- }
- }
- })
- fs.currentMaster = ""
- }
-}
-
-func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
-
- grpcConnection, err := util.GrpcDial(master)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", master, err)
- }
- defer grpcConnection.Close()
-
- client := master_pb.NewSeaweedClient(grpcConnection)
-
- return fn(client)
-}