diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/notification_kafka.go | 10 | ||||
| -rw-r--r-- | weed/replication/notifications.go | 2 | ||||
| -rw-r--r-- | weed/replication/replicator.go | 11 | ||||
| -rw-r--r-- | weed/replication/sink/fetch_write.go | 123 | ||||
| -rw-r--r-- | weed/replication/sink/filer_sink.go | 140 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 22 |
6 files changed, 231 insertions, 77 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go index aaf08a96c..d10175757 100644 --- a/weed/replication/notification_kafka.go +++ b/weed/replication/notification_kafka.go @@ -1,17 +1,17 @@ package replication import ( + "fmt" + "github.com/Shopify/sarama" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" "github.com/chrislusf/seaweedfs/weed/glog" - "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" ) func init() { - NotificationInputs = append(NotificationInputs, &KafkaInput{ - }) + NotificationInputs = append(NotificationInputs, &KafkaInput{}) } type KafkaInput struct { diff --git a/weed/replication/notifications.go b/weed/replication/notifications.go index ff40c3aad..6ae95d36b 100644 --- a/weed/replication/notifications.go +++ b/weed/replication/notifications.go @@ -1,8 +1,8 @@ package replication import ( - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type NotificationInput interface { diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 4f5d5203e..66d194128 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -1,12 +1,13 @@ package replication import ( - "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/replication/source" "strings" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" ) type Replicator struct { @@ -30,6 +31,8 @@ func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { } } + sink.SetSourceFiler(source) + return &Replicator{ sink: sink, source: source, diff --git a/weed/replication/sink/fetch_write.go b/weed/replication/sink/fetch_write.go new file mode 100644 index 000000000..4408a712e --- /dev/null +++ b/weed/replication/sink/fetch_write.go @@ -0,0 +1,123 @@ +package sink + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { + if len(sourceChunks) == 0 { + return + } + var wg sync.WaitGroup + for _, sourceChunk := range sourceChunks { + wg.Add(1) + go func(chunk *filer_pb.FileChunk) { + defer wg.Done() + replicatedChunk, e := fs.replicateOneChunk(chunk) + if e != nil { + err = e + } + replicatedChunks = append(replicatedChunks, replicatedChunk) + }(sourceChunk) + } + wg.Wait() + + return +} + +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { + + fileId, err := fs.fetchAndWrite(sourceChunk) + if err != nil { + return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + } + + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: sourceChunk.Offset, + Size: sourceChunk.Size, + Mtime: sourceChunk.Mtime, + ETag: sourceChunk.ETag, + SourceFileId: sourceChunk.FileId, + }, nil +} + +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { + + filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId) + if err != nil { + return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err) + } + defer readCloser.Close() + + var host string + + if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: fs.replication, + Collection: fs.collection, + TtlSec: fs.ttlSec, + DataCenter: fs.dataCenter, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + + fileId, host = resp.FileId, resp.Url + + return nil + }); err != nil { + return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + + glog.V(3).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + + uploadResult, err := operation.Upload(fileUrl, filename, readCloser, + "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "") + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + return +} + +func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := util.GrpcDial(fs.grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 387bffb58..422bdeabc 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -1,14 +1,14 @@ package sink import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "context" "fmt" - "strings" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "context" - "sync" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" ) type ReplicationSink interface { @@ -16,11 +16,17 @@ type ReplicationSink interface { CreateEntry(key string, entry *filer_pb.Entry) error UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error GetDirectory() string + SetSourceFiler(s *source.FilerSource) } type FilerSink struct { grpcAddress string dir string + filerSource *source.FilerSource + replication string + collection string + ttlSec int32 + dataCenter string } func (fs *FilerSink) GetDirectory() string { @@ -34,6 +40,10 @@ func (fs *FilerSink) Initialize(configuration util.Configuration) error { ) } +func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { + fs.filerSource = s +} + func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.dir = dir @@ -65,13 +75,15 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - replicatedChunks, err := replicateChunks(entry.Chunks) + replicatedChunks, err := fs.replicateChunks(entry.Chunks) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) return fmt.Errorf("replicate entry chunks %s: %v", key, err) } + glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -96,70 +108,84 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { - return nil -} +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) { -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + ctx := context.Background() - grpcConnection, err := util.GrpcDial(fs.grpcAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() + dir, name := filer2.FullPath(key).DirAndName() - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + // find out what changed + deletedChunks, newChunks := compareChunks(oldEntry, newEntry) - return fn(client) -} + // read existing entry + var entry *filer_pb.Entry + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } -func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { - if len(sourceChunks) == 0 { - return - } - var wg sync.WaitGroup - for _, s := range sourceChunks { - wg.Add(1) - go func(chunk *filer_pb.FileChunk) { - defer wg.Done() - replicatedChunk, e := replicateOneChunk(chunk) - if e != nil { - err = e - } - replicatedChunks = append(replicatedChunks, replicatedChunk) - }(s) - } - wg.Wait() + glog.V(4).Infof("lookup directory entry: %v", request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + glog.V(0).Infof("lookup %s: %v", key, err) + return err + } - return -} + entry = resp.Entry -func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { + return nil + }) - fileId, err := fetchAndWrite(sourceChunk) if err != nil { - return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + return err } - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: sourceChunk.Offset, - Size: sourceChunk.Size, - Mtime: sourceChunk.Mtime, - ETag: sourceChunk.ETag, - SourceFileId: sourceChunk.FileId, - }, nil -} + // delete the chunks that are deleted from the source + if deleteIncludeChunks { + // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks + entry.Chunks = minusChunks(entry.Chunks, deletedChunks) + } + + // replicate the chunks that are new in the source + replicatedChunks, err := fs.replicateChunks(newChunks) + entry.Chunks = append(entry.Chunks, replicatedChunks...) + + // save updated meta data + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + } -func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update entry %s: %v", key, err) + } + + return nil + }) + +} +func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { + deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks) + newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks) + return +} +func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + for _, a := range as { + found := false + for _, b := range bs { + if a.FileId == b.FileId { + found = true + break + } + } + if !found { + delta = append(delta, a) + } + } return } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index f6a4cc55f..69e74a63c 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -1,13 +1,15 @@ package source import ( - "io" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" + "io" + "net/http" "strings" - "context" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type ReplicationSource interface { @@ -32,7 +34,7 @@ func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { return nil } -func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -55,21 +57,21 @@ func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err erro if err != nil { glog.V(1).Infof("replication lookup volume id %s: %v", vid, err) - return nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err) + return "", nil, nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { glog.V(1).Infof("replication locate volume id %s: %v", vid, err) - return nil, fmt.Errorf("replication locate volume id %s: %v", vid, err) + return "", nil, nil, fmt.Errorf("replication locate volume id %s: %v", vid, err) } fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) - _, readCloser, err = util.DownloadUrl(fileUrl) + filename, header, readCloser, err = util.DownloadFile(fileUrl) - return readCloser, err + return filename, header, readCloser, err } func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
