diff options
Diffstat (limited to 'weed/replication/sink/filersink/fetch_write.go')
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 26c055da5..fe1e87b6b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,10 +3,11 @@ package filersink import ( "context" "fmt" - "google.golang.org/grpc" "strings" "sync" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -14,7 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -23,7 +24,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ wg.Add(1) go func(chunk *filer_pb.FileChunk) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(ctx, chunk) + replicatedChunk, e := fs.replicateOneChunk(ctx, chunk, dir) if e != nil { err = e } @@ -35,9 +36,9 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ return } -func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(ctx, sourceChunk) + fileId, err := fs.fetchAndWrite(ctx, sourceChunk, dir) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } @@ -52,7 +53,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p }, nil } -func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString()) if err != nil { @@ -71,6 +72,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, + ParentPath: dir, } resp, err := client.AssignVolume(ctx, request) |
