diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
| commit | 788acdf5275dfb7610afe9144c17d9128d1737f6 (patch) | |
| tree | 55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/replication/source | |
| parent | 865a0179369601e496d385e47bdbda93dcc3f243 (diff) | |
| download | seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip | |
add WIP filer.replicate
Diffstat (limited to 'weed/replication/source')
| -rw-r--r-- | weed/replication/source/filer_source.go | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go new file mode 100644 index 000000000..49c623815 --- /dev/null +++ b/weed/replication/source/filer_source.go @@ -0,0 +1,97 @@ +package source + +import ( + "io" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "strings" + "context" +) + +type ReplicationSource interface { + ReadPart(part string) io.ReadCloser +} + +type FilerSource struct { + grpcAddress string + id string + dir string +} + +func (fs *FilerSource) Initialize(configuration util.Configuration) error { + return fs.initialize( + configuration.GetString("grpcAddress"), + configuration.GetString("id"), + configuration.GetString("directory"), + ) +} + +func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) { + fs.grpcAddress = grpcAddress + fs.id = id + fs.dir = dir + return nil +} + +func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) { + + vid2Locations := make(map[string]*filer_pb.Locations) + + vid := volumeId(part) + + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + glog.V(4).Infof("read lookup volume id locations: %v", vid) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + vid2Locations = resp.LocationsMap + + return nil + }) + + if err != nil { + glog.V(1).Infof("replication lookup volume id: %v", vid, err) + return nil, fmt.Errorf("replicationlookup volume id %v: %v", vid, err) + } + + locations := vid2Locations[vid] + + if locations == nil || len(locations.Locations) == 0 { + glog.V(1).Infof("replication locate volume id: %v", vid, err) + return nil, fmt.Errorf("replication locate volume id %v: %v", vid, err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + + _, readCloser, err = util.DownloadUrl(fileUrl) + + return readCloser, err +} + +func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := util.GrpcDial(fs.grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} |
