diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-02-24 22:28:45 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-02-24 22:28:45 -0800 |
| commit | 6ab7368ef2556ef086d13c6d0d4454f1e98a5cd8 (patch) | |
| tree | 636d3d9cecfb44f71133c2387d01125fa75bc4a3 /weed/replication | |
| parent | 5bcb44eda9b1dba57abf8cd9ce3b2d18518bd100 (diff) | |
| download | seaweedfs-6ab7368ef2556ef086d13c6d0d4454f1e98a5cd8.tar.xz seaweedfs-6ab7368ef2556ef086d13c6d0d4454f1e98a5cd8.zip | |
filer: dynamically create bucket under /buckets folder
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 4 |
2 files changed, 10 insertions, 8 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 26c055da5..fe1e87b6b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,10 +3,11 @@ package filersink import ( "context" "fmt" - "google.golang.org/grpc" "strings" "sync" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -14,7 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -23,7 +24,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ wg.Add(1) go func(chunk *filer_pb.FileChunk) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(ctx, chunk) + replicatedChunk, e := fs.replicateOneChunk(ctx, chunk, dir) if e != nil { err = e } @@ -35,9 +36,9 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ return } -func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(ctx, sourceChunk) + fileId, err := fs.fetchAndWrite(ctx, sourceChunk, dir) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } @@ -52,7 +53,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p }, nil } -func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString()) if err != nil { @@ -71,6 +72,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, + ParentPath: dir, } resp, err := client.AssignVolume(ctx, request) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index de99fbe1c..8c4c39bc4 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -105,7 +105,7 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p } } - replicatedChunks, err := fs.replicateChunks(ctx, entry.Chunks) + replicatedChunks, err := fs.replicateChunks(ctx, entry.Chunks, dir) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) @@ -184,7 +184,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(ctx, newChunks) + replicatedChunks, err := fs.replicateChunks(ctx, newChunks, newParentPath) if err != nil { return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } |
