aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/replication/sink/filersink/fetch_write.go14
-rw-r--r--weed/replication/sink/filersink/filer_sink.go31
2 files changed, 15 insertions, 30 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 74f3a72bb..07b091073 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -3,7 +3,6 @@ package filersink
import (
"context"
"fmt"
- "strings"
"sync"
"google.golang.org/grpc"
@@ -69,7 +68,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
var host string
var auth security.EncodedJwt
- if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -114,7 +113,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
return
}
-func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
@@ -122,11 +121,6 @@ func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption)
}
-
-func volumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
+func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index ffce853b8..5f055f9d1 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -65,30 +65,21 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
}
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
- return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- dir, name := util.FullPath(key).DirAndName()
-
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir,
- Name: name,
- IsDeleteData: deleteIncludeChunks,
- }
-
- glog.V(1).Infof("delete entry: %v", request)
- _, err := client.DeleteEntry(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
- return fmt.Errorf("delete entry %s: %v", key, err)
- }
+ dir, name := util.FullPath(key).DirAndName()
- return nil
- })
+ glog.V(1).Infof("delete entry: %v", key)
+ err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false)
+ if err != nil {
+ glog.V(0).Infof("delete entry %s: %v", key, err)
+ return fmt.Errorf("delete entry %s: %v", key, err)
+ }
+ return nil
}
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
- return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(key).DirAndName()
@@ -140,7 +131,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// read existing entry
var existingEntry *filer_pb.Entry
- err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@@ -192,7 +183,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
// save updated meta data
- return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: newParentPath,