aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-24 00:01:44 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-24 00:01:44 -0800
commit80b869268898798c662618b11933af82ee374f1c (patch)
treeaaa2904c1a55d0cb054db324b6964df540e84485
parent6897f1bfbc0731b5e1581313a3e248ad7991cef8 (diff)
downloadseaweedfs-80b869268898798c662618b11933af82ee374f1c.tar.xz
seaweedfs-80b869268898798c662618b11933af82ee374f1c.zip
filer.sync: replicate outside of either cluster, only need to see filers
-rw-r--r--weed/command/filer_sync.go54
-rw-r--r--weed/pb/grpc_client_server.go16
-rw-r--r--weed/replication/sink/filersink/fetch_write.go4
-rw-r--r--weed/replication/sink/filersink/filer_sink.go32
-rw-r--r--weed/replication/source/filer_source.go19
-rw-r--r--weed/server/filer_server_handlers.go30
6 files changed, 106 insertions, 49 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index af0a624b1..587174059 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -20,19 +20,21 @@ import (
)
type SyncOptions struct {
- isActivePassive *bool
- filerA *string
- filerB *string
- aPath *string
- bPath *string
- aReplication *string
- bReplication *string
- aCollection *string
- bCollection *string
- aTtlSec *int
- bTtlSec *int
- aDebug *bool
- bDebug *bool
+ isActivePassive *bool
+ filerA *string
+ filerB *string
+ aPath *string
+ bPath *string
+ aReplication *string
+ bReplication *string
+ aCollection *string
+ bCollection *string
+ aTtlSec *int
+ bTtlSec *int
+ aDebug *bool
+ bDebug *bool
+ aProxyByFiler *bool
+ bProxyByFiler *bool
}
var (
@@ -43,7 +45,7 @@ var (
func init() {
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
- syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true")
+ syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
@@ -54,6 +56,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
+ syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", true, "read and write file chunks by filer A instead of volume servers")
+ syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", true, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
@@ -62,8 +66,8 @@ func init() {
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
- Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters",
- Long: `continuously synchronize file changes between two active-active or active-passive filers
+ Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
+ Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
@@ -86,8 +90,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
- *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler,
+ *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler,
+ *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -98,8 +103,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
- *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler,
+ *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler,
+ *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -113,8 +119,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
- replicationStr, collection string, ttlSec int, debug bool) error {
+func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error {
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
@@ -138,9 +144,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// create filer sink
filerSource := &source.FilerSource{}
- filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
+ filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
- filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
+ filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index f19af43b2..4d78d769f 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -138,6 +138,22 @@ func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
}
+func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
+ hostnameAndPort := strings.Split(grpcAddress, ":")
+ if len(hostnameAndPort) != 2 {
+ return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress)
+ }
+
+ grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ if parseErr != nil {
+ return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
+ }
+
+ port := int(grpcPort) - 10000
+
+ return fmt.Sprintf("%s:%d", hostnameAndPort[0], port)
+}
+
func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master)
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d193ff81c..b062adcfe 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
replicatedChunk, e := fs.replicateOneChunk(chunk, path)
if e != nil {
err = e
+ return
}
replicatedChunks[index] = replicatedChunk
}(sourceChunk, chunkIndex)
@@ -97,6 +98,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if fs.writeChunkByFiler {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
+ }
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 9c0e4176f..600ff51f0 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@@ -18,14 +19,16 @@ import (
)
type FilerSink struct {
- filerSource *source.FilerSource
- grpcAddress string
- dir string
- replication string
- collection string
- ttlSec int32
- dataCenter string
- grpcDialOption grpc.DialOption
+ filerSource *source.FilerSource
+ grpcAddress string
+ dir string
+ replication string
+ collection string
+ ttlSec int32
+ dataCenter string
+ grpcDialOption grpc.DialOption
+ address string
+ writeChunkByFiler bool
}
func init() {
@@ -42,26 +45,33 @@ func (fs *FilerSink) GetSinkToDirectory() string {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
+ "",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
- security.LoadClientTLS(util.GetViper(), "grpc.client"))
+ security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ false)
}
func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
fs.filerSource = s
}
-func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
- replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption) (err error) {
+func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
+ replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
+ fs.address = address
+ if fs.address == "" {
+ fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
+ }
fs.grpcAddress = grpcAddress
fs.dir = dir
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.grpcDialOption = grpcDialOption
+ fs.writeChunkByFiler = writeChunkByFiler
return nil
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index ff4f2eb26..3982360b0 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -25,19 +25,28 @@ type FilerSource struct {
grpcAddress string
grpcDialOption grpc.DialOption
Dir string
+ address string
+ proxyByFiler bool
}
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
+ "",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
+ false,
)
}
-func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) {
+func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
+ fs.address = address
+ if fs.address == "" {
+ fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
+ }
fs.grpcAddress = grpcAddress
fs.Dir = dir
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ fs.proxyByFiler = readChunkFromFiler
return nil
}
@@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return
}
-func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
+func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
+
+ if fs.proxyByFiler {
+ return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId)
+ }
- fileUrls, err := fs.LookupFileId(part)
+ fileUrls, err := fs.LookupFileId(fileId)
if err != nil {
return "", nil, nil, err
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 5b93c6d08..fcafe3893 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -3,30 +3,38 @@ package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+
+ start := time.Now()
+
+ // proxy to volume servers
+ var fileId string
+ if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
+ fileId = r.RequestURI[len("/?proxyChunkId="):]
+ }
+ if fileId != "" {
+ stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
+ fs.proxyToVolumeServer(w,r,fileId)
+ stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
+ return
+ }
+
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
if r.Header.Get("Origin") != "" {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}
- start := time.Now()
switch r.Method {
case "GET":
- fileId := r.FormValue("proxyToFileId")
- if fileId != "" {
- stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
- fs.proxyToVolumeServer(w,r,fileId)
- stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
- } else {
- stats.FilerRequestCounter.WithLabelValues("get").Inc()
- fs.GetOrHeadHandler(w, r, true)
- stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
- }
+ stats.FilerRequestCounter.WithLabelValues("get").Inc()
+ fs.GetOrHeadHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)