diff options
Diffstat (limited to 'weed')
88 files changed, 3091 insertions, 399 deletions
diff --git a/weed/Makefile b/weed/Makefile index fd0843c22..8f1257d09 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -33,3 +33,7 @@ debug_webdav: debug_s3: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3 + +debug_filer_copy: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index c1bc80c42..af0793c70 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -41,6 +41,7 @@ type BenchmarkOptions struct { grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient fsync *bool + useTcp *bool } var ( @@ -67,6 +68,7 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") + b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") sharedBytes = make([]byte, 1024) } @@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) + volumeTcpClient := wdclient.NewVolumeTcpClient() + for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) @@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { + if *b.useTcp { + if uploadByTcp(volumeTcpClient, fp) { + fileIdLineChan <- fp.Fid + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + } + } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b } } +func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool { + + err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) + if err != nil { + glog.Errorf("upload chunk err: %v", err) + return false + } + + return true +} + func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { diff --git a/weed/command/command.go b/weed/command/command.go index bbc2e0423..a9063eaa0 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -15,7 +15,9 @@ var Commands = []*Command{ cmdDownload, cmdExport, cmdFiler, + cmdFilerBackup, cmdFilerCat, + cmdFilerMetaBackup, cmdFilerMetaTail, cmdFilerReplicate, cmdFilerSynchronize, diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go new file mode 100644 index 000000000..888b46fe7 --- /dev/null +++ b/weed/command/filer_backup.go @@ -0,0 +1,157 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "io" + "time" +) + +type FilerBackupOptions struct { + isActivePassive *bool + filer *string + path *string + debug *bool + proxyByFiler *bool + timeAgo *time.Duration +} + +var ( + filerBackupOptions FilerBackupOptions +) + +func init() { + cmdFilerBackup.Run = runFilerBackup // break init cycle + filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") + filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") + filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") + filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") + filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerBackup = &Command{ + UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ", + Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml", + Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml + + filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content, + and write to the destination. This is to replace filer.replicate command since additional message queue is not needed. + + If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute. + A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value. + +`, +} + +func runFilerBackup(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + util.LoadConfiguration("security", false) + util.LoadConfiguration("replication", true) + + for { + err := doFilerBackup(grpcDialOption, &filerBackupOptions) + if err != nil { + glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) + time.Sleep(1747 * time.Millisecond) + } + } + + return true +} + +const ( + BackupKeyPrefix = "backup." +) + +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error { + + // find data sink + config := util.GetViper() + dataSink := findSink(config) + if dataSink == nil { + return fmt.Errorf("no data sink configured in replication.toml") + } + + sourceFiler := *backupOption.filer + sourcePath := *backupOption.path + timeAgo := *backupOption.timeAgo + targetPath := dataSink.GetSinkToDirectory() + debug := *backupOption.debug + + // get start time for the data sink + startFrom := time.Unix(0, 0) + sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory()) + if timeAgo.Milliseconds() == 0 { + lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId)) + if err != nil { + glog.V(0).Infof("starting from %v", startFrom) + } else { + startFrom = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resuming from %v", startFrom) + } + } else { + startFrom = time.Now().Add(-timeAgo) + glog.V(0).Infof("start time is set to %v", startFrom) + } + + // create filer sink + filerSource := &source.FilerSource{} + filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler) + dataSink.SetSourceFiler(filerSource) + + processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) + + return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "backup_" + dataSink.GetName(), + PathPrefix: sourcePath, + SinceNs: startFrom.UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + var counter int64 + var lastWriteTime time.Time + for { + resp, listenErr := stream.Recv() + + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("processEventFn: %v", err) + } + + counter++ + if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) + counter = 0 + lastWriteTime = time.Now() + if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { + return fmt.Errorf("setOffset: %v", err) + } + } + + } + + }) + +} diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go new file mode 100644 index 000000000..ba0b44659 --- /dev/null +++ b/weed/command/filer_meta_backup.go @@ -0,0 +1,268 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" + "google.golang.org/grpc" + "io" + "reflect" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + metaBackup FilerMetaBackupOptions +) + +type FilerMetaBackupOptions struct { + grpcDialOption grpc.DialOption + filerAddress *string + filerDirectory *string + restart *bool + backupFilerConfig *string + + store filer.FilerStore +} + +func init() { + cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle + metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port") + metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer") + metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup") + metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store") +} + +var cmdFilerMetaBackup = &Command{ + UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml", + Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml", + Long: `continuously backup filer meta data changes. +The backup writes to another filer store specified in a backup_filer.toml. + + weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" + weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart + + `, +} + +func runFilerMetaBackup(cmd *Command, args []string) bool { + + metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + // load backup_filer.toml + v := viper.New() + v.SetConfigFile(*metaBackup.backupFilerConfig) + + if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file + glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", + *metaBackup.backupFilerConfig, "backup_filer", "filer") + } + + if err := metaBackup.initStore(v); err != nil { + glog.V(0).Infof("init backup filer store: %v", err) + return true + } + + missingPreviousBackup := false + _, err := metaBackup.getOffset() + if err != nil { + missingPreviousBackup = true + } + + if *metaBackup.restart || missingPreviousBackup { + glog.V(0).Infof("traversing metadata tree...") + startTime := time.Now() + if err := metaBackup.traverseMetadata(); err != nil { + glog.Errorf("traverse meta data: %v", err) + return true + } + glog.V(0).Infof("metadata copied up to %v", startTime) + if err := metaBackup.setOffset(startTime); err != nil { + startTime = time.Now() + } + } + + for { + err := metaBackup.streamMetadataBackup() + if err != nil { + glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err) + time.Sleep(1747 * time.Millisecond) + } + } + + return true +} + +func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error { + // load configuration for default filer store + hasDefaultStoreConfigured := false + for _, store := range filer.Stores { + if v.GetBool(store.GetName() + ".enabled") { + store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore) + if err := store.Initialize(v, store.GetName()+"."); err != nil { + glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) + } + glog.V(0).Infof("configured filer store to %s", store.GetName()) + hasDefaultStoreConfigured = true + metaBackup.store = filer.NewFilerStoreWrapper(store) + break + } + } + if !hasDefaultStoreConfigured { + return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed()) + } + + return nil +} + +func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) { + var saveErr error + + traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) { + + println("+", parentPath.Child(entry.Name)) + if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil { + saveErr = fmt.Errorf("insert entry error: %v\n", err) + return + } + + }) + + if traverseErr != nil { + return fmt.Errorf("traverse: %v", traverseErr) + } + return saveErr +} + +var ( + MetaBackupKey = []byte("metaBackup") +) + +func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { + + startTime, err := metaBackup.getOffset() + if err != nil { + startTime = time.Now() + } + glog.V(0).Infof("streaming from %v", startTime) + + store := metaBackup.store + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + + ctx := context.Background() + message := resp.EventNotification + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.InsertEntry(ctx, entry) + } + if message.OldEntry != nil && message.NewEntry == nil { + println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + } + if message.OldEntry != nil && message.NewEntry != nil { + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.UpdateEntry(ctx, entry) + } + println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil { + return err + } + println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry)) + } + + return nil + } + + tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "meta_backup", + PathPrefix: *metaBackup.filerDirectory, + SinceNs: startTime.UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + var counter int64 + var lastWriteTime time.Time + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + if err = eachEntryFunc(resp); err != nil { + return err + } + + counter++ + if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) + counter = 0 + lastWriteTime = time.Now() + if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { + return err2 + } + } + + } + + }) + return tailErr +} + +func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { + value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey) + if err != nil { + return + } + tsNs := util.BytesToUint64(value) + + return time.Unix(0, int64(tsNs)), nil +} + +func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error { + valueBuf := make([]byte, 8) + util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano())) + + if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil { + return err + } + return nil +} + +var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) + +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) + +} + +func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index f055b19a8..8451ffd78 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -23,9 +23,9 @@ func init() { } var cmdFilerMetaTail = &Command{ - UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", - Short: "see recent changes on a filer", - Long: `See recent changes on a filer. + UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]", + Short: "see continuous changes on a filer", + Long: `See continuous changes on a filer. weed filer.meta.tail -timeAgo=30h | grep truncate weed filer.meta.tail -timeAgo=30h | jq . @@ -36,7 +36,7 @@ var cmdFilerMetaTail = &Command{ var ( tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port") - tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer") tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>") diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index e8c06b208..885c95540 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - var dataSink sink.ReplicationSink - for _, sk := range sink.Sinks { - if config.GetBool("sink." + sk.GetName() + ".enabled") { - if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { - glog.Fatalf("Failed to initialize sink for %s: %+v", - sk.GetName(), err) - } - glog.V(0).Infof("Configure sink to %s", sk.GetName()) - dataSink = sk - break - } - } + dataSink := findSink(config) if dataSink == nil { println("no data sink configured in replication.toml:") @@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool { } +func findSink(config *util.ViperProxy) sink.ReplicationSink { + var dataSink sink.ReplicationSink + for _, sk := range sink.Sinks { + if config.GetBool("sink." + sk.GetName() + ".enabled") { + if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { + glog.Fatalf("Failed to initialize sink for %s: %+v", + sk.GetName(), err) + } + glog.V(0).Infof("Configure sink to %s", sk.GetName()) + dataSink = sk + break + } + } + return dataSink +} + func validateOneEnabledInput(config *util.ViperProxy) { enabledInput := "" for _, input := range sub.NotificationInputs { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index f8beb6fda..0f34e5701 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication" + "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" @@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) if err != nil { return err } @@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) + persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - - var sourceOldKey, sourceNewKey util.FullPath - if message.OldEntry != nil { - sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) - } - if message.NewEntry != nil { - sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) - } - for _, sig := range message.Signatures { if sig == targetFilerSignature && targetFilerSignature != 0 { fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) return nil } } - if debug { - fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature) - } - - if !strings.HasPrefix(resp.Directory, sourcePath) { - return nil - } - - // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { - if !strings.HasPrefix(string(sourceOldKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - - // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { - if !strings.HasPrefix(string(sourceNewKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } - - // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { - return nil - } - - // handle updates - if strings.HasPrefix(string(sourceOldKey), sourcePath) { - // old key is in the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is also in the watched directory - oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) - foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) - if foundExisting { - return err - } - - // not able to find old entry - if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %v", oldKey, err) - } - - // create the new entry - newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures) - - } else { - // new key is outside of the watched directory - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - } else { - // old key is outside of the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is in the watched directory - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } else { - // new key is also outside of the watched directory - // skip - } - } - - return nil + return persistEventFn(resp) } return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) counter = 0 lastWriteTime = time.Now() - if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil { + if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { return err } } @@ -290,11 +215,11 @@ const ( SyncKeyPrefix = "sync." ) -func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) { +func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) if err != nil { @@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature } -func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error { +func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) valueBuf := make([]byte, 8) util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) @@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur }) } + +func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { + // process function + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + + var sourceOldKey, sourceNewKey util.FullPath + if message.OldEntry != nil { + sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) + } + if message.NewEntry != nil { + sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) + } + + if debug { + glog.V(0).Infof("received %v", resp) + } + + if !strings.HasPrefix(resp.Directory, sourcePath) { + return nil + } + + // handle deletions + if message.OldEntry != nil && message.NewEntry == nil { + if !strings.HasPrefix(string(sourceOldKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + + // handle new entries + if message.OldEntry == nil && message.NewEntry != nil { + if !strings.HasPrefix(string(sourceNewKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } + + // this is something special? + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + + // handle updates + if strings.HasPrefix(string(sourceOldKey), sourcePath) { + // old key is in the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is also in the watched directory + if !dataSink.IsIncremental() { + oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) + message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) + if foundExisting { + return err + } + + // not able to find old entry + if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { + return fmt.Errorf("delete old entry %v: %v", oldKey, err) + } + } + // create the new entry + newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures) + + } else { + // new key is outside of the watched directory + if !dataSink.IsIncremental() { + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + } + } else { + // old key is outside of the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is in the watched directory + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } else { + // new key is also outside of the watched directory + // skip + } + } + + return nil + } + return processEventFn +} + +func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string { + if !dataSink.IsIncremental() { + return util.Join(targetPath, string(sourceKey)[len(sourcePath):]) + } + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey := time.Unix(mTime, 0).Format("2006-01-02") + return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) +} diff --git a/weed/command/master.go b/weed/command/master.go index d569919cd..fb58cfefd 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -138,7 +138,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } - // Create your protocol servers. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index a6d562d40..8da69d0ac 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -53,7 +53,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { filer := *option.filer // parse filer grpc address - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer) if err != nil { glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) return true @@ -63,16 +63,23 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + for i := 0; i < 10; i++ { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) if err != nil { - return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.V(0).Infof("wait for %d seconds ...", i+1) + time.Sleep(time.Duration(i+1)*time.Second) } - cipher = resp.Cipher - return nil - }) + } if err != nil { - glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err) return true } diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index b4b5855ff..db0b4148d 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -63,7 +63,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/command/s3.go b/weed/command/s3.go index d8e3e306b..c8292a7d5 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -137,7 +137,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 993391a42..07d448042 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -356,6 +356,9 @@ directory = "/buckets" [sink.local] enabled = false directory = "/data" +# all replicated files are under modified time as yyyy-mm-dd directories +# so each date directory contains all new and updated files. +is_incremental = false [sink.local_incremental] # all replicated files are under modified time as yyyy-mm-dd directories @@ -373,6 +376,7 @@ directory = "/backup" replication = "" collection = "" ttlSec = 0 +is_incremental = false [sink.s3] # read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html @@ -384,6 +388,7 @@ region = "us-east-2" bucket = "your_bucket_name" # an existing bucket directory = "/" # destination directory endpoint = "" +is_incremental = false [sink.google_cloud_storage] # read credentials doc at https://cloud.google.com/docs/authentication/getting-started @@ -391,6 +396,7 @@ enabled = false google_application_credentials = "/path/to/x.json" # path to json credential file bucket = "your_bucket_seaweedfs" # an existing bucket directory = "/" # destination directory +is_incremental = false [sink.azure] # experimental, let me know if it works @@ -399,6 +405,7 @@ account_name = "" account_key = "" container = "mycontainer" # an existing container directory = "/" # destination directory +is_incremental = false [sink.backblaze] enabled = false @@ -406,6 +413,7 @@ b2_account_id = "" b2_master_application_key = "" bucket = "mybucket" # an existing bucket directory = "/" # destination directory +is_incremental = false ` @@ -432,22 +440,28 @@ expires_after_seconds = 10 # seconds # the host name is not checked, so the PERM files can be shared. [grpc] ca = "" +# Set wildcard domain for enable TLS authentication by common names +allowed_wildcard_domain = "" # .mycompany.com [grpc.volume] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.master] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.filer] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.msg_broker] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names # use this for any place needs a grpc client # i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" @@ -455,7 +469,6 @@ key = "" cert = "" key = "" - # volume server https options # Note: work in progress! # this does not work with other clients, e.g., "weed filer|mount" etc, yet. diff --git a/weed/command/server.go b/weed/command/server.go index 611578953..a39802412 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -111,6 +111,7 @@ func init() { serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") + serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") @@ -156,19 +157,21 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) - peers := strings.Join(peerList, ",") - masterOptions.peers = &peers + if *isStartingMasterServer { + _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) + peers := strings.Join(peerList, ",") + masterOptions.peers = &peers + } // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = &peers + filerOptions.masters = masterOptions.peers filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = &peers + serverOptions.v.masters = masterOptions.peers serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack diff --git a/weed/command/volume.go b/weed/command/volume.go index 659c93d96..f49ece9dc 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -62,6 +62,7 @@ type VolumeServerOptions struct { preStopSeconds *int metricsHttpPort *int // pulseSeconds *int + enableTcp *bool } func init() { @@ -88,6 +89,7 @@ func init() { v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") + v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port") } var cmdVolume = &Command{ @@ -251,6 +253,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } + // starting tcp server + if *v.enableTcp { + go v.startTcpService(volumeServer) + } + // starting the cluster http server clusterHttpServer := v.startClusterHttpService(volumeMux) @@ -368,3 +375,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd }() return clusterHttpServer } + +func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000) + glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) + listener, e := util.NewListener(listeningAddress, 0) + if e != nil { + glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e) + } + defer listener.Close() + + for { + c, err := listener.Accept() + if err != nil { + fmt.Println(err) + return + } + go volumeServer.HandleTcpConnection(c) + } +} diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 2bd4a3c61..781ea1e36 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -78,7 +78,7 @@ func (wo *WebDavOption) startWebDav() bool { } // parse filer grpc address - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 50a669f40..bedf2f4d1 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -11,6 +11,10 @@ import ( type HardLinkId []byte +const ( + MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" +) + func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil @@ -77,7 +81,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath) } for _, sub := range entries { diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 295a5039e..b29324b61 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -11,6 +11,10 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { + f.maybeReloadFilerConfiguration(event) +} + +func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { if DirectoryEtcSeaweedFS != event.Directory { if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath { return @@ -26,7 +30,6 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) if entry.Name == FilerConfName { f.reloadFilerConfiguration(entry) } - } func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 9437e9992..5c368a57e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -69,6 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string peerSignature, err = ma.readFilerStoreSignature(peer) } + // when filer store is not shared by multiple filers if peerSignature != f.Signature { if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { lastTsNs = prevTsNs diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 075204b79..573ab65e8 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -15,7 +15,7 @@ import ( func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - // fmt.Printf("start to stream content for chunks: %+v\n", chunks) + glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -26,6 +26,9 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err + } else if len(urlStrings) == 0 { + glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) } fileId2Url[chunkView.FileId] = urlStrings } @@ -39,6 +42,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c glog.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err) } + _, err = w.Write(data) if err != nil { glog.Errorf("write chunk: %v", err) @@ -181,7 +185,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 10a0a2b44..33e1a0a3a 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -251,10 +251,10 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - - fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) + + fullFilePath := dirPath.Child(req.Name) visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) if visitErr != nil { glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) @@ -305,7 +305,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath()) + dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir ReadDirAll %s", dirPath) processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { @@ -318,12 +319,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { return nil } - dirPath := util.FullPath(dir.FullPath()) if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return nil, fuse.EIO } - listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { processEachEntryFn(entry.ToProtoEntry(), false) return true }) @@ -389,12 +389,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { // clear entry inside the file fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) if fsNode != nil { if file, ok := fsNode.(*File); ok { file.clearEntry() } } - dir.wfs.fsNodeCache.DeleteFsNode(filePath) // remove current file handle if any dir.wfs.handlesLock.Lock() diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index ba3280f03..606e52fcb 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -35,15 +35,20 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f return nil, err } + oldEntry := oldFile.getEntry() + if oldEntry == nil { + return nil, fuse.EIO + } + // update old file to hardlink mode - if len(oldFile.entry.HardLinkId) == 0 { - oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) - oldFile.entry.HardLinkCounter = 1 + if len(oldEntry.HardLinkId) == 0 { + oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) + oldEntry.HardLinkCounter = 1 } - oldFile.entry.HardLinkCounter++ + oldEntry.HardLinkCounter++ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ Directory: oldFile.dir.FullPath(), - Entry: oldFile.entry, + Entry: oldEntry, Signatures: []int32{dir.wfs.signature}, } @@ -53,11 +58,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, - Attributes: oldFile.entry.Attributes, - Chunks: oldFile.entry.Chunks, - Extended: oldFile.entry.Extended, - HardLinkId: oldFile.entry.HardLinkId, - HardLinkCounter: oldFile.entry.HardLinkCounter, + Attributes: oldEntry.Attributes, + Chunks: oldEntry.Chunks, + Extended: oldEntry.Extended, + HardLinkId: oldEntry.HardLinkId, + HardLinkCounter: oldEntry.HardLinkCounter, }, Signatures: []int32{dir.wfs.signature}, } @@ -83,6 +88,10 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f return nil }) + if err != nil { + return nil, fuse.EIO + } + // create new file node newNode := dir.newFile(req.NewName, request.Entry) newFile := newNode.(*File) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index f05a3a56a..8888cff96 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) + glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD return false } - fileSize := int64(pages.f.entry.Attributes.FileSize) + entry := pages.f.getEntry() + if entry == nil { + return false + } + + fileSize := int64(entry.Attributes.FileSize) chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) if chunkSize == 0 { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a210c5152..5931dd2ff 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -5,6 +5,7 @@ import ( "io" "os" "sort" + "sync" "time" "github.com/seaweedfs/fuse" @@ -33,6 +34,7 @@ type File struct { dir *Dir wfs *WFS entry *filer_pb.Entry + entryLock sync.RWMutex entryViewCache []filer.VisibleInterval isOpen int reader io.ReaderAt @@ -47,7 +49,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr) - entry := file.entry + entry := file.getEntry() if file.isOpen <= 0 || entry == nil { if entry, err = file.maybeLoadEntry(ctx); err != nil { return err @@ -106,7 +108,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) - _, err := file.maybeLoadEntry(ctx) + entry, err := file.maybeLoadEntry(ctx) if err != nil { return err } @@ -123,12 +125,12 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { - glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) - if req.Size < filer.FileSize(file.entry) { + glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) + if req.Size < filer.FileSize(entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk - for _, chunk := range file.entry.Chunks { + for _, chunk := range entry.Chunks { int64Size := int64(chunk.Size) if chunk.Offset+int64Size > int64(req.Size) { // this chunk is truncated @@ -143,36 +145,36 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } } - file.entry.Chunks = chunks + entry.Chunks = chunks file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.reader = nil } - file.entry.Attributes.FileSize = req.Size + entry.Attributes.FileSize = req.Size file.dirtyMetadata = true } if req.Valid.Mode() { - file.entry.Attributes.FileMode = uint32(req.Mode) + entry.Attributes.FileMode = uint32(req.Mode) file.dirtyMetadata = true } if req.Valid.Uid() { - file.entry.Attributes.Uid = req.Uid + entry.Attributes.Uid = req.Uid file.dirtyMetadata = true } if req.Valid.Gid() { - file.entry.Attributes.Gid = req.Gid + entry.Attributes.Gid = req.Gid file.dirtyMetadata = true } if req.Valid.Crtime() { - file.entry.Attributes.Crtime = req.Crtime.Unix() + entry.Attributes.Crtime = req.Crtime.Unix() file.dirtyMetadata = true } if req.Valid.Mtime() { - file.entry.Attributes.Mtime = req.Mtime.Unix() + entry.Attributes.Mtime = req.Mtime.Unix() file.dirtyMetadata = true } @@ -188,7 +190,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.saveEntry(file.entry) + return file.saveEntry(entry) } @@ -258,7 +260,7 @@ func (file *File) Forget() { } func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { - entry = file.entry + entry = file.getEntry() if file.isOpen > 0 { return entry, nil } @@ -299,8 +301,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { } } + entry := file.getEntry() + if entry == nil { + return + } + // pick out-of-order chunks from existing chunks - for _, chunk := range file.entry.Chunks { + for _, chunk := range entry.Chunks { if lessThan(earliestChunk, chunk) { chunks = append(chunks, chunk) } @@ -318,18 +325,22 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { file.reader = nil - glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks)) - file.entry.Chunks = append(file.entry.Chunks, newChunks...) + entry.Chunks = append(entry.Chunks, newChunks...) } func (file *File) setEntry(entry *filer_pb.Entry) { + file.entryLock.Lock() + defer file.entryLock.Unlock() file.entry = entry file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks) file.reader = nil } func (file *File) clearEntry() { + file.entryLock.Lock() + defer file.entryLock.Unlock() file.entry = nil file.entryViewCache = nil file.reader = nil @@ -359,3 +370,9 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error { return nil }) } + +func (file *File) getEntry() *filer_pb.Entry { + file.entryLock.RLock() + defer file.entryLock.RUnlock() + return file.entry +} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index fb073c9cd..25eaf7033 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -40,8 +40,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { Uid: uid, Gid: gid, } - if fh.f.entry != nil { - fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry) + entry := fh.f.getEntry() + if entry != nil { + entry.Attributes.FileSize = filer.FileSize(entry) } return fh @@ -104,22 +105,28 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - fileSize := int64(filer.FileSize(fh.f.entry)) + entry := fh.f.getEntry() + if entry == nil { + return 0, io.EOF + } + + fileSize := int64(filer.FileSize(entry)) + fileFullPath := fh.f.fullpath() if fileSize == 0 { - glog.V(1).Infof("empty fh %v", fh.f.fullpath()) + glog.V(1).Infof("empty fh %v", fileFullPath) return 0, io.EOF } - if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) { - totalRead := copy(buff, fh.f.entry.Content[offset:]) - glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead) + if offset+int64(len(buff)) <= int64(len(entry.Content)) { + totalRead := copy(buff, entry.Content[offset:]) + glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), nil } var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } @@ -136,10 +143,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { totalRead, err := reader.ReadAt(buff, offset) if err != nil && err != io.EOF { - glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + glog.Errorf("file handle read %s: %v", fileFullPath, err) } - glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } @@ -158,8 +165,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f copy(data, req.Data) } - fh.f.entry.Content = nil - fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) + entry := fh.f.getEntry() + if entry == nil { + return fuse.EIO + } + + entry.Content = nil + entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) fh.dirtyPages.AddPage(req.Offset, data) @@ -242,35 +254,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - if fh.f.entry.Attributes != nil { - fh.f.entry.Attributes.Mime = fh.contentType - if fh.f.entry.Attributes.Uid == 0 { - fh.f.entry.Attributes.Uid = header.Uid + entry := fh.f.getEntry() + if entry == nil { + return nil + } + + if entry.Attributes != nil { + entry.Attributes.Mime = fh.contentType + if entry.Attributes.Uid == 0 { + entry.Attributes.Uid = header.Uid } - if fh.f.entry.Attributes.Gid == 0 { - fh.f.entry.Attributes.Gid = header.Gid + if entry.Attributes.Gid == 0 { + entry.Attributes.Gid = header.Gid } - if fh.f.entry.Attributes.Crtime == 0 { - fh.f.entry.Attributes.Crtime = time.Now().Unix() + if entry.Attributes.Crtime == 0 { + entry.Attributes.Crtime = time.Now().Unix() } - fh.f.entry.Attributes.Mtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) - fh.f.entry.Attributes.Collection = fh.dirtyPages.collection - fh.f.entry.Attributes.Replication = fh.dirtyPages.replication + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) + entry.Attributes.Collection = fh.dirtyPages.collection + entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ Directory: fh.f.dir.FullPath(), - Entry: fh.f.entry, + Entry: entry, Signatures: []int32{fh.f.wfs.signature}, } - glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) - for i, chunk := range fh.f.entry.Chunks { + glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks)) + for i, chunk := range entry.Chunks { glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } - manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) @@ -278,7 +295,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } - fh.f.entry.Chunks = append(chunks, manifestChunks...) + entry.Chunks = append(chunks, manifestChunks...) fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index fdec8253c..6b1012090 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { } if f, ok := src.node.(*File); ok { f.Name = target.name - if f.entry != nil { - f.entry.Name = f.Name + entry := f.getEntry() + if entry != nil { + entry.Name = f.Name } } parent.disconnectChild(target) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 70428bb07..9957a04cd 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -31,6 +31,7 @@ type UploadResult struct { Mime string `json:"mime,omitempty"` Gzip uint32 `json:"gzip,omitempty"` ContentMd5 string `json:"contentMd5,omitempty"` + RetryCount int `json:"-"` } func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { @@ -96,6 +97,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by for i := 0; i < 3; i++ { uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) if err == nil { + uploadResult.RetryCount = i return } else { glog.Warningf("uploading to %s: %v", uploadUrl, err) diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 079fbd671..e4d8bee34 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "os" "strings" "time" @@ -101,12 +102,16 @@ func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, } func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - + // Redundancy limit to make it correctly judge whether it is the last file. + redLimit := limit + if limit != math.MaxInt32 && limit != 0{ + redLimit = limit + 1 + } request := &ListEntriesRequest{ Directory: string(fullDirPath), Prefix: prefix, StartFromFileName: startFrom, - Limit: limit, + Limit: redLimit, InclusiveStartFrom: inclusive, } @@ -119,6 +124,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix } var prevEntry *Entry + count := 0 for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -139,6 +145,10 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix } } prevEntry = resp.Entry + count++ + if count > int(limit) && limit != 0 { + prevEntry = nil + } } return nil diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 910114313..9efcd9bdc 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,6 +3,7 @@ package pb import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "net/http" "strconv" "strings" @@ -108,51 +109,55 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts } func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { - colonIndex := strings.LastIndex(server, ":") - if colonIndex < 0 { - return "", fmt.Errorf("server should have hostname:port format: %v", server) - } + return ParseServerAddress(server, 10000) +} - port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64) +func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { + + host, port, parseErr := hostAndPort(server) if parseErr != nil { return "", fmt.Errorf("server port parse error: %v", parseErr) } - grpcPort := int(port) + 10000 + newPort := int(port) + deltaPort - return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil + return fmt.Sprintf("%s:%d", host, newPort), nil } -func ServerToGrpcAddress(server string) (serverGrpcAddress string) { - hostnameAndPort := strings.Split(server, ":") - if len(hostnameAndPort) != 2 { - return fmt.Sprintf("unexpected server address: %s", server) +func hostAndPort(address string) (host string, port uint64, err error) { + colonIndex := strings.LastIndex(address, ":") + if colonIndex < 0 { + return "", 0, fmt.Errorf("server should have hostname:port format: %v", address) + } + port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64) + if err != nil { + return "", 0, fmt.Errorf("server port parse error: %v", err) } - port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + return address[:colonIndex], port, err +} + +func ServerToGrpcAddress(server string) (serverGrpcAddress string) { + + host, port, parseErr := hostAndPort(server) if parseErr != nil { - return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) + glog.Fatalf("server address %s parse error: %v", server, parseErr) } grpcPort := int(port) + 10000 - return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) + return fmt.Sprintf("%s:%d", host, grpcPort) } func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { - hostnameAndPort := strings.Split(grpcAddress, ":") - if len(hostnameAndPort) != 2 { - return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress) - } - - grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + host, grpcPort, parseErr := hostAndPort(grpcAddress) if parseErr != nil { - return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) + glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr) } port := int(grpcPort) - 10000 - return fmt.Sprintf("%s:%d", hostnameAndPort[0], port) + return fmt.Sprintf("%s:%d", host, port) } func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { @@ -197,19 +202,3 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption }, filerGrpcAddress, grpcDialOption) } - -func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { - hostnameAndPort := strings.Split(filer, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) - } - - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("filer port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil -} diff --git a/weed/pb/volume_info.go b/weed/pb/volume_info.go index c4f733f5c..cae9e018f 100644 --- a/weed/pb/volume_info.go +++ b/weed/pb/volume_info.go @@ -15,40 +15,49 @@ import ( ) // MaybeLoadVolumeInfo load the file data as *volume_server_pb.VolumeInfo, the returned volumeInfo will not be nil -func MaybeLoadVolumeInfo(fileName string) (*volume_server_pb.VolumeInfo, bool, error) { +func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeInfo, hasRemoteFile bool, hasVolumeInfoFile bool, err error) { - volumeInfo := &volume_server_pb.VolumeInfo{} + volumeInfo = &volume_server_pb.VolumeInfo{} glog.V(1).Infof("maybeLoadVolumeInfo checks %s", fileName) if exists, canRead, _, _, _ := util.CheckFile(fileName); !exists || !canRead { if !exists { - return volumeInfo, false, nil + return } + hasVolumeInfoFile = true if !canRead { glog.Warningf("can not read %s", fileName) - return volumeInfo, false, fmt.Errorf("can not read %s", fileName) + err = fmt.Errorf("can not read %s", fileName) + return } - return volumeInfo, false, nil + return } + hasVolumeInfoFile = true + glog.V(1).Infof("maybeLoadVolumeInfo reads %s", fileName) tierData, readErr := ioutil.ReadFile(fileName) if readErr != nil { glog.Warningf("fail to read %s : %v", fileName, readErr) - return volumeInfo, false, fmt.Errorf("fail to read %s : %v", fileName, readErr) + err = fmt.Errorf("fail to read %s : %v", fileName, readErr) + return + } glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName) - if err := jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil { + if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil { glog.Warningf("unmarshal error: %v", err) - return volumeInfo, false, fmt.Errorf("unmarshal error: %v", err) + err = fmt.Errorf("unmarshal error: %v", err) + return } if len(volumeInfo.GetFiles()) == 0 { - return volumeInfo, false, nil + return } - return volumeInfo, true, nil + hasRemoteFile = true + + return } func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) error { diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_util.go index 3514c6977..f642bb801 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_util.go @@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 7688029e6..d7e609c68 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } var dateKey string - if r.sink.GetName() == "local_incremental" { + if r.sink.IsIncremental() { var mTime int64 if message.NewEntry != nil { mTime = message.NewEntry.Attributes.Mtime diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index df70be64b..d13a1049b 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -18,10 +18,11 @@ import ( ) type AzureSink struct { - containerURL azblob.ContainerURL - container string - dir string - filerSource *source.FilerSource + containerURL azblob.ContainerURL + container string + dir string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } +func (g *AzureSink) IsIncremental() bool { + return g.isIncremental +} + func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix + "is_incremental") return g.initialize( configuration.GetString(prefix+"account_name"), configuration.GetString(prefix+"account_key"), diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 24f0ecbbc..90a0bb2e8 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -14,10 +14,11 @@ import ( ) type B2Sink struct { - client *b2.Client - bucket string - dir string - filerSource *source.FilerSource + client *b2.Client + bucket string + dir string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } +func (g *B2Sink) IsIncremental() bool { + return g.isIncremental +} + func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix + "is_incremental") return g.initialize( configuration.GetString(prefix+"b2_account_id"), configuration.GetString(prefix+"b2_master_application_key"), diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 509f75116..d7c5fccc3 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -30,6 +30,7 @@ type FilerSink struct { grpcDialOption grpc.DialOption address string writeChunkByFiler bool + isIncremental bool } func init() { @@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } +func (fs *FilerSink) IsIncremental() bool { + return fs.isIncremental +} + func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { + fs.isIncremental = configuration.GetBool(prefix + "is_incremental") return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"), diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index badabc32c..5cf5b7317 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -18,10 +18,11 @@ import ( ) type GcsSink struct { - client *storage.Client - bucket string - dir string - filerSource *source.FilerSource + client *storage.Client + bucket string + dir string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } +func (g *GcsSink) IsIncremental() bool { + return g.isIncremental +} + func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix + "is_incremental") return g.initialize( configuration.GetString(prefix+"google_application_credentials"), configuration.GetString(prefix+"bucket"), diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index 21c625c3f..2b9b3e69a 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string { return localsink.Dir } +func (localsink *LocalSink) IsIncremental() bool { + return true +} + func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { if localsink.isMultiPartEntry(key) { return nil @@ -74,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa if _, err := os.Stat(dir); os.IsNotExist(err) { glog.V(4).Infof("Create Direcotry key: %s", dir) - if err = os.MkdirAll(dir, 0); err != nil { + if err = os.MkdirAll(dir, 0755); err != nil { return err } } writeFunc := func(data []byte) error { - writeErr := ioutil.WriteFile(key, data, 0) + writeErr := ioutil.WriteFile(key, data, 0755) return writeErr } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index cfc6e0a4d..4ffd09462 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -14,6 +14,7 @@ type ReplicationSink interface { UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) + IsIncremental() bool } var ( diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 58432ee6b..9a36573e3 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -21,12 +21,13 @@ import ( ) type S3Sink struct { - conn s3iface.S3API - region string - bucket string - dir string - endpoint string - filerSource *source.FilerSource + conn s3iface.S3API + region string + bucket string + dir string + endpoint string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } +func (s3sink *S3Sink) IsIncremental() bool { + return s3sink.isIncremental +} + func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) + glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental")) + s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental") return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), @@ -67,8 +74,9 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc s3sink.endpoint = endpoint config := &aws.Config{ - Region: aws.String(s3sink.region), - Endpoint: aws.String(s3sink.endpoint), + Region: aws.String(s3sink.region), + Endpoint: aws.String(s3sink.endpoint), + S3ForcePathStyle: aws.Bool(true), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") @@ -104,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures uploadId, err := s3sink.createMultipartUpload(key, entry) if err != nil { - return err + return fmt.Errorf("createMultipartUpload: %v", err) } totalSize := filer.FileSize(entry) @@ -120,6 +128,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures defer wg.Done() if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr + glog.Errorf("uploadPart: %v", uploadErr) } else { parts[index] = part } @@ -129,7 +138,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures if err != nil { s3sink.abortMultipartUpload(key, uploadId) - return err + return fmt.Errorf("uploadPart: %v", err) } return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index b172ea2c3..3dde52616 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -24,7 +24,7 @@ func (s3sink *S3Sink) deleteObject(key string) error { result, err := s3sink.conn.DeleteObject(input) if err == nil { - glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) + glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) } else { glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) } @@ -43,7 +43,7 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) ( result, err := s3sink.conn.CreateMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) + glog.V(2).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) } else { glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) return "", err @@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId result, err := s3sink.conn.CompleteMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) + glog.V(2).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) } else { glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) + return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) } - return err + return nil } // To upload a part @@ -122,7 +123,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. result, err := s3sink.conn.UploadPart(input) if err == nil { - glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) + glog.V(2).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) } else { glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) } @@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er } buf := make([]byte, chunk.Size) for _, fileUrl := range fileUrls { - _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf) + _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf) if err != nil { glog.V(1).Infof("read from %s: %v", fileUrl, err) } else { diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 3982360b0..e2e3575dc 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - for _, loc := range locations.Locations { - fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part)) + if !fs.proxyByFiler { + for _, loc := range locations.Locations { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part)) + } + } else { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part)) } return diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 3626ece98..12a74126a 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -31,6 +31,10 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl return nil }, startFrom, inclusive, limit) + if len(entries) == 0 { + isLast = true + } + return } diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 338f82668..48e8cb047 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques var buckets []*s3.Bucket for _, entry := range entries { if entry.IsDirectory { - if identity != nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) { + if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) { continue } buckets = append(buckets, &s3.Bucket{ diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 19d85c495..610daef9f 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -5,9 +5,11 @@ import ( "encoding/json" "encoding/xml" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "io" "io/ioutil" "net/http" + "net/url" "sort" "strings" @@ -69,7 +71,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } } else { - uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) @@ -84,6 +86,14 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w) } +func urlPathEscape(object string) string { + var escapedParts []string + for _, part := range strings.Split(object, "/") { + escapedParts = append(escapedParts, url.PathEscape(part)) + } + return strings.Join(escapedParts, "/") +} + func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := getBucketAndObject(r) @@ -94,7 +104,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.proxyToFiler(w, r, destUrl, passThroughResponse) @@ -105,7 +115,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request bucket, object := getBucketAndObject(r) destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.proxyToFiler(w, r, destUrl, passThroughResponse) @@ -116,7 +126,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque bucket, object := getBucketAndObject(r) destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) { for k, v := range proxyResponse.Header { @@ -196,6 +206,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h if err == nil { directoriesWithDeletion[parentDirectoryPath]++ deletedObjects = append(deletedObjects, object) + } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { + deletedObjects = append(deletedObjects, object) } else { delete(directoriesWithDeletion, parentDirectoryPath) deleteErrors = append(deleteErrors, DeleteError{ diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index 044e732db..035302ae6 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -110,7 +110,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - uploadUrl := fmt.Sprintf("http://%s%s/%s/%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody) diff --git a/weed/security/tls.go b/weed/security/tls.go index b4bf84e2d..7d3ffcdca 100644 --- a/weed/security/tls.go +++ b/weed/security/tls.go @@ -1,10 +1,16 @@ package security import ( + "context" "crypto/tls" "crypto/x509" "github.com/chrislusf/seaweedfs/weed/util" + grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" "io/ioutil" + "strings" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -12,21 +18,29 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) -func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption { +type Authenticator struct { + AllowedWildcardDomain string + AllowedCommonNames map[string]bool +} + +func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption, grpc.ServerOption) { if config == nil { - return nil + return nil, nil } // load cert/key, ca cert cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key")) if err != nil { - glog.V(1).Infof("load cert/key error: %v", err) - return nil + glog.V(1).Infof("load cert: %s / key: %s error: %v", + config.GetString(component+".cert"), + config.GetString(component+".key"), + err) + return nil, nil } caCert, err := ioutil.ReadFile(config.GetString("grpc.ca")) if err != nil { - glog.V(1).Infof("read ca cert file error: %v", err) - return nil + glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err) + return nil, nil } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -36,7 +50,20 @@ func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption ClientAuth: tls.RequireAndVerifyClientCert, }) - return grpc.Creds(ta) + allowedCommonNames := config.GetString(component + ".allowed_commonNames") + allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain") + if allowedCommonNames != "" || allowedWildcardDomain != "" { + allowedCommonNamesMap := make(map[string]bool) + for _, s := range strings.Split(allowedCommonNames, ",") { + allowedCommonNamesMap[s] = true + } + auther := Authenticator{ + AllowedCommonNames: allowedCommonNamesMap, + AllowedWildcardDomain: allowedWildcardDomain, + } + return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate)) + } + return grpc.Creds(ta), nil } func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption { @@ -70,3 +97,28 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption { }) return grpc.WithTransportCredentials(ta) } + +func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) { + p, ok := peer.FromContext(ctx) + if !ok { + return ctx, status.Error(codes.Unauthenticated, "no peer found") + } + + tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo) + if !ok { + return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials") + } + if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 { + return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate") + } + + commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName + if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) { + return ctx, nil + } + if _, ok := a.AllowedCommonNames[commonName]; ok { + return ctx, nil + } + + return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 22474a5e2..9e0770afa 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -153,7 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { for _, master := range fs.option.Masters { - _, err := pb.ParseFilerGrpcAddress(master) + _, err := pb.ParseServerToGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index f77462adb..892a732f7 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - if len(entry.Chunks) == 0 && len(entry.Content) == 0 { - glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) - stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc() - w.WriteHeader(http.StatusNoContent) - return - } - w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) // mime type mimeType := entry.Attr.Mime @@ -164,6 +156,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) + if err != nil { + glog.Errorf("failed to write entry content: %v", err) + } return err } return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d3ce7e605..318399281 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -38,10 +38,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * chunkSize := 1024 * 1024 * maxMB - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + stats.FilerRequestCounter.WithLabelValues("chunk").Inc() start := time.Now() defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds()) }() var reply *FilerPostResult @@ -302,13 +302,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { - stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() + stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() start := time.Now() defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) + stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) }() uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + if uploadResult != nil && uploadResult.RetryCount > 0 { + stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) + } return uploadResult, err, data } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 29aff5c0b..156afd4a1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if !ms.Topo.HasWritableVolume(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for "+option.String()) + return nil, fmt.Errorf("no free volumes left for " + option.String()) } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { @@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType)) stats := volumeLayout.Stats() - - totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 - resp := &master_pb.StatisticsResponse{ - TotalSize: uint64(totalSize), + TotalSize: stats.TotalSize, UsedSize: stats.UsedSize, FileCount: stats.FileCount, } diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go new file mode 100644 index 000000000..a009611da --- /dev/null +++ b/weed/server/volume_server_tcp_handlers_write.go @@ -0,0 +1,137 @@ +package weed_server + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "net" + "strings" +) + +func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { + defer c.Close() + + glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) + + bufReader := bufio.NewReaderSize(c, 1024*1024) + bufWriter := bufio.NewWriterSize(c, 1024*1024) + + for { + cmd, err := bufReader.ReadString('\n') + if err != nil { + if err != io.EOF { + glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + } + return + } + cmd = cmd[:len(cmd)-1] + switch cmd[0] { + case '+': + fileId := cmd[1:] + err = vs.handleTcpPut(fileId, bufReader) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '-': + fileId := cmd[1:] + err = vs.handleTcpDelete(fileId) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '?': + fileId := cmd[1:] + err = vs.handleTcpGet(fileId, bufWriter) + case '!': + bufWriter.Flush() + } + + } + +} + +func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + err = volume.StreamRead(n, writer) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + sizeBuf := make([]byte, 4) + if _, err = bufReader.Read(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + err = volume.StreamWrite(n, bufReader, dataSize) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + _, err = vs.store.DeleteVolumeNeedle(volumeId, n) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { + + commaIndex := strings.LastIndex(fileId, ",") + if commaIndex <= 0 { + return 0, nil, fmt.Errorf("unknown fileId %s", fileId) + } + + vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] + + volumeId, ve := needle.NewVolumeId(vid) + if ve != nil { + return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) + } + + n := new(needle.Needle) + n.ParsePath(fid) + return volumeId, n, nil +} diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index edacf22c6..634cb11e2 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -274,7 +274,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() - fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds) vidMap := make(map[uint32]bool) eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go new file mode 100644 index 000000000..1fe13d981 --- /dev/null +++ b/weed/shell/command_s3_clean_uploads.go @@ -0,0 +1,92 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandS3CleanUploads{}) +} + +type commandS3CleanUploads struct { +} + +func (c *commandS3CleanUploads) Name() string { + return "s3.clean.uploads" +} + +func (c *commandS3CleanUploads) Help() string { + return `clean up stale multipart uploads + + Example: + s3.clean.uploads -replication 001 + +` +} + +func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } + + var buckets []string + err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + buckets = append(buckets, entry.Name) + return nil + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + } + + for _, bucket:= range buckets { + c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo) + } + + return err + +} + +func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error { + uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads" + var staleUploads []string + now := time.Now() + err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + ctime := time.Unix(entry.Attributes.Crtime, 0) + if ctime.Add(timeAgo).Before(now) { + staleUploads = append(staleUploads, entry.Name) + } + return nil + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list uploads under %v: %v", uploadsDir, err) + } + + for _, staleUpload:= range staleUploads { + deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload) + fmt.Fprintf(writer, "purge %s\n", deleteUrl) + + err = util.Delete(deleteUrl, "") + if err != nil { + return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) + } + } + + return nil + +}
\ No newline at end of file diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index a15efd6b3..f7fa94031 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin keepDataNodesSorted(allLocations, toDiskType) fn := capacityByFreeVolumeCount(toDiskType) for _, dst := range allLocations { - if fn(dst.dataNode) > 0 { + if fn(dst.dataNode) > 0 && !hasFoundTarget { // ask the volume server to replicate the volume if isOneOf(dst.dataNode.Id, locations) { continue diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index b8b883be6..2dc61d02e 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -58,6 +58,9 @@ func LoadConfiguration(config *util.ViperProxy) { if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { continue } + if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found { + continue + } backendStorage, buildErr := backendStorageFactory.BuildStorage(config, StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) if buildErr != nil { @@ -81,6 +84,9 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { glog.Warningf("storage type %s not found", storageBackend.Type) continue } + if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found { + continue + } backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 498963c31..3b42429cf 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { return } -func (df *DiskFile) Append(p []byte) (n int, err error) { +func (df *DiskFile) Write(p []byte) (n int, err error) { return df.WriteAt(p, df.fileSize) } diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go index e2fdf1eb6..b8378c379 100644 --- a/weed/storage/backend/s3_backend/s3_sessions.go +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -34,8 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string) } config := &aws.Config{ - Region: aws.String(region), - Endpoint: aws.String(endpoint), + Region: aws.String(region), + Endpoint: aws.String(endpoint), + S3ForcePathStyle: aws.Bool(true), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 85d6a5fc8..171db92a4 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -63,7 +63,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection // read volume info ev.Version = needle.Version3 - if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { + if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { ev.Version = needle.Version(volumeInfo.Version) } else { pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go index 6fd910bb7..22456faa2 100644 --- a/weed/storage/needle/crc.go +++ b/weed/storage/needle/crc.go @@ -2,6 +2,7 @@ package needle import ( "fmt" + "io" "github.com/klauspost/crc32" @@ -29,3 +30,25 @@ func (n *Needle) Etag() string { util.Uint32toBytes(bits, uint32(n.Checksum)) return fmt.Sprintf("%x", bits) } + +func NewCRCwriter(w io.Writer) *CRCwriter { + + return &CRCwriter{ + crc: CRC(0), + w: w, + } + +} + +type CRCwriter struct { + crc CRC + w io.Writer +} + +func (c *CRCwriter) Write(p []byte) (n int, err error) { + n, err = c.w.Write(p) // with each write ... + c.crc = c.crc.Update(p) + return +} + +func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 0f72bc0bb..e51df955e 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi } if err != nil { fileSize, _, _ := r.GetStat() - println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) + println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) } return dataSlice, err diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 9716e9729..31c86d124 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -152,8 +152,10 @@ func (m *LevelDbNeedleMap) Close() { glog.Warningf("close index file %s failed: %v", indexFileName, err) } - if err := m.db.Close(); err != nil { - glog.Warningf("close levelDB failed: %v", err) + if m.db != nil { + if err := m.db.Close(); err != nil { + glog.Warningf("close levelDB failed: %v", err) + } } } diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index 3449ff9dc..662b90531 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { } func (m *SortedFileNeedleMap) Close() { - m.indexFile.Close() - m.dbFile.Close() + if m.indexFile != nil { + m.indexFile.Close() + } + if m.dbFile != nil { + m.dbFile.Close() + } } func (m *SortedFileNeedleMap) Destroy() error { diff --git a/weed/storage/store.go b/weed/storage/store.go index 47829666a..fb33a708c 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -220,20 +220,30 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if maxFileKey < curMaxFileKey { maxFileKey = curMaxFileKey } + deleteVolume := false if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { volumeMessages = append(volumeMessages, volumeMessage) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) + deleteVolume = true } else { glog.V(0).Infof("volume %d is expired", v.Id) } if v.lastIoError != nil { deleteVids = append(deleteVids, v.Id) + deleteVolume = true glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError) } } - collectionVolumeSize[v.Collection] += volumeMessage.Size + + if _, exist := collectionVolumeSize[v.Collection]; !exist { + collectionVolumeSize[v.Collection] = 0 + } + if !deleteVolume { + collectionVolumeSize[v.Collection] += volumeMessage.Size + } + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ "IsReadOnly": 0, @@ -242,7 +252,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { "isDiskSpaceLow": 0, } } - if v.IsReadOnly() { + if !deleteVolume && v.IsReadOnly() { collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 if v.noWriteOrDelete { collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 @@ -267,7 +277,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { glog.V(0).Infof("volume %d is deleted", vid) } } else { - glog.V(0).Infof("delete volume %d: %v", vid, err) + glog.Warningf("delete volume %d: %v", vid, err) } } location.volumesLock.Unlock() @@ -446,7 +456,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error { // load, modify, save baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name())) vifFile := filepath.Join(location.Directory, baseFileName+".vif") - volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile) + volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile) if err != nil { return fmt.Errorf("volume %d fail to load vif", i) } diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index 65ec53819..a263e6669 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { } func (rp *ReplicaPlacement) Byte() byte { + if rp == nil { + return 0 + } ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount return byte(ret) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index bff1055bb..0cf603ad8 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -39,12 +39,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } }() - hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0 + hasVolumeInfoFile := v.maybeLoadVolumeInfo() if v.HasRemoteFile() { v.noWriteCanDelete = true v.noWriteOrDelete = false - glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files) + glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo) v.LoadRemoteFile() alreadyHasSuperBlock = true } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists { @@ -83,6 +83,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if alreadyHasSuperBlock { err = v.readSuperBlock() + glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version) + if v.HasRemoteFile() { + // maybe temporary network problem + glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err) + err = nil + } } else { if !v.SuperBlock.Initialized() { return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 07376bc88..1853e458a 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -104,47 +104,8 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) return } - if v.isFileUnchanged(n) { - size = Size(n.DataSize) - isUnchanged = true - return - } - - // check whether existing needle cookie matches - nv, ok := v.nm.Get(n.Id) - if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) - if existingNeedleReadErr != nil { - err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) - return - } - if existingNeedle.Cookie != n.Cookie { - glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) - err = fmt.Errorf("mismatching cookie %x", n.Cookie) - return - } - } - - // append to dat file - n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, size, _, err = n.Append(v.DataBackend, v.Version()) - v.checkReadWriteError(err) - if err != nil { - return - } - - v.lastAppendAtNs = n.AppendAtNs - // add to needle map - if !ok || uint64(nv.Offset.ToActualOffset()) < offset { - if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - } - if v.lastModifiedTsSeconds < n.LastModified { - v.lastModifiedTsSeconds = n.LastModified - } - return + return v.doWriteRequest(n) } func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) { @@ -223,24 +184,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) { return 0, err } - nv, ok := v.nm.Get(n.Id) - // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok && nv.Size.IsValid() { - size := nv.Size - n.Data = nil - n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, _, _, err := n.Append(v.DataBackend, v.Version()) - v.checkReadWriteError(err) - if err != nil { - return size, err - } - v.lastAppendAtNs = n.AppendAtNs - if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { - return size, err - } - return size, err - } - return 0, nil + return v.doDeleteRequest(n) } func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) { diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go new file mode 100644 index 000000000..955875aa2 --- /dev/null +++ b/weed/storage/volume_stream_write.go @@ -0,0 +1,104 @@ +package storage + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + df, ok := v.DataBackend.(*backend.DiskFile) + if !ok { + return fmt.Errorf("unexpected volume backend") + } + offset, _, _ := v.DataBackend.GetStat() + + header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation + CookieToBytes(header[0:CookieSize], n.Cookie) + NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) + n.Size = 4 + Size(dataSize) + 1 + SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) + + n.DataSize = dataSize + + // needle header + df.Write(header[0:NeedleHeaderSize]) + + // data size and data + util.Uint32toBytes(header[0:4], n.DataSize) + df.Write(header[0:4]) + // write and calculate CRC + crcWriter := needle.NewCRCwriter(df) + io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) + + // flags + util.Uint8toBytes(header[0:1], n.Flags) + df.Write(header[0:1]) + + // data checksum + util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) + // write timestamp, padding + n.AppendAtNs = uint64(time.Now().UnixNano()) + util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) + padding := needle.PaddingLength(n.Size, needle.Version3) + df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) + + // add to needle map + if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) + } + return +} + +func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return ErrorNotFound + } + + sr := &StreamReader{ + readerAt: v.DataBackend, + offset: nv.Offset.ToActualOffset(), + } + bufReader := bufio.NewReader(sr) + bufReader.Discard(NeedleHeaderSize) + sizeBuf := make([]byte, 4) + bufReader.Read(sizeBuf) + if _, err = writer.Write(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) + + return +} + +type StreamReader struct { + offset int64 + readerAt io.ReaderAt +} + +func (sr *StreamReader) Read(p []byte) (n int, err error) { + n, err = sr.readerAt.ReadAt(p, sr.offset) + if err != nil { + return + } + sr.offset += int64(n) + return +} diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go index 77efd8a14..23160906b 100644 --- a/weed/storage/volume_tier.go +++ b/weed/storage/volume_tier.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { @@ -14,13 +15,23 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { func (v *Volume) maybeLoadVolumeInfo() (found bool) { - v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) + var err error + v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) + + if v.volumeInfo.Version == 0 { + v.volumeInfo.Version = uint32(needle.CurrentVersion) + } if v.hasRemoteFile { glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id, v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key) } + if err != nil { + glog.Warningf("load volume %d.vif file: %v", v.Id, err) + return + } + return } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 0ee1e61c6..be84f8a13 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if err != nil { return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } - dstDatBackend.Append(needleBytes) + dstDatBackend.Write(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index df1b6d658..330b16b24 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -25,7 +25,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) existingEcShards := dn.GetEcShards() - // found out the newShards and deletedShards + // find out the newShards and deletedShards var newShardCount, deletedShardCount int for _, ecShards := range existingEcShards { @@ -56,20 +56,19 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } + for _, ecShards := range actualShards { + if dn.hasEcShards(ecShards.VolumeId) { + continue + } + + newShards = append(newShards, ecShards) disk := dn.getOrCreateDisk(ecShards.DiskType) deltaDiskUsages := newDiskUsages() deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) - - if !dn.hasEcShards(ecShards.VolumeId) { - newShards = append(newShards, ecShards) - newShardCount += ecShards.ShardIdCount() - } - - deltaDiskUsage.ecShardCount = int64(newShardCount) + deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount()) disk.UpAdjustDiskUsageDelta(deltaDiskUsages) - } if len(newShards) > 0 || len(deletedShards) > 0 { diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 5784c894b..c7e171248 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -432,7 +432,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { - ret.TotalSize += vl.volumeSizeLimit + ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length()) } } diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 64c13ca52..548c4cd25 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -82,7 +82,7 @@ func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64 if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) if err == nil { - return vinfo.Size - vinfo.DeletedByteCount, vinfo.FileCount - vinfo.DeleteCount + return (vinfo.Size - vinfo.DeletedByteCount) * uint64(len(dnll.list)), vinfo.FileCount - vinfo.DeleteCount } } } diff --git a/weed/util/constants.go b/weed/util/constants.go index 908f782ae..1cd2f160a 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 27) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 31) COMMIT = "" ) diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go index 82575af98..6c31a40da 100644 --- a/weed/util/fasthttp_util.go +++ b/weed/util/fasthttp_util.go @@ -18,6 +18,7 @@ var ( WriteTimeout: time.Second, MaxIdleConnDuration: time.Minute, DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this. + DialDualStack: true, } // Put everything in pools to prevent garbage. diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 4a5884e9b..eb5700e8c 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) { // The return value will be ErrHelp if -help or -h were set but not defined. func (f *FlagSet) Parse(arguments []string) error { if _, ok := f.formal[DefaultConfigFlagName]; !ok { - f.String(DefaultConfigFlagName, "", "config file") + f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format") } f.parsed = true @@ -995,6 +995,7 @@ func NewFlagSet(name string, errorHandling ErrorHandling) *FlagSet { f := &FlagSet{ name: name, errorHandling: errorHandling, + envPrefix: EnvPrefix, } return f } @@ -1078,7 +1079,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand // DefaultConfigFlagName defines the flag name of the optional config file // path. Used to lookup and parse the config file when a default is set and // available on disk. -var DefaultConfigFlagName = "conf" +var DefaultConfigFlagName = "options" // ParseFile parses flags from the file in path. // Same format as commandline arguments, newlines and lines beginning with a diff --git a/weed/wdclient/net2/base_connection_pool.go b/weed/wdclient/net2/base_connection_pool.go new file mode 100644 index 000000000..5cc037d0f --- /dev/null +++ b/weed/wdclient/net2/base_connection_pool.go @@ -0,0 +1,159 @@ +package net2 + +import ( + "net" + "strings" + "time" + + rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool" +) + +const defaultDialTimeout = 1 * time.Second + +func defaultDialFunc(network string, address string) (net.Conn, error) { + return net.DialTimeout(network, address, defaultDialTimeout) +} + +func parseResourceLocation(resourceLocation string) ( + network string, + address string) { + + idx := strings.Index(resourceLocation, " ") + if idx >= 0 { + return resourceLocation[:idx], resourceLocation[idx+1:] + } + + return "", resourceLocation +} + +// A thin wrapper around the underlying resource pool. +type connectionPoolImpl struct { + options ConnectionOptions + + pool rp.ResourcePool +} + +// This returns a connection pool where all connections are connected +// to the same (network, address) +func newBaseConnectionPool( + options ConnectionOptions, + createPool func(rp.Options) rp.ResourcePool) ConnectionPool { + + dial := options.Dial + if dial == nil { + dial = defaultDialFunc + } + + openFunc := func(loc string) (interface{}, error) { + network, address := parseResourceLocation(loc) + return dial(network, address) + } + + closeFunc := func(handle interface{}) error { + return handle.(net.Conn).Close() + } + + poolOptions := rp.Options{ + MaxActiveHandles: options.MaxActiveConnections, + MaxIdleHandles: options.MaxIdleConnections, + MaxIdleTime: options.MaxIdleTime, + OpenMaxConcurrency: options.DialMaxConcurrency, + Open: openFunc, + Close: closeFunc, + NowFunc: options.NowFunc, + } + + return &connectionPoolImpl{ + options: options, + pool: createPool(poolOptions), + } +} + +// This returns a connection pool where all connections are connected +// to the same (network, address) +func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool { + return newBaseConnectionPool(options, rp.NewSimpleResourcePool) +} + +// This returns a connection pool that manages multiple (network, address) +// entries. The connections to each (network, address) entry acts +// independently. For example ("tcp", "localhost:11211") could act as memcache +// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1. +func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool { + return newBaseConnectionPool( + options, + func(poolOptions rp.Options) rp.ResourcePool { + return rp.NewMultiResourcePool(poolOptions, nil) + }) +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) NumActive() int32 { + return p.pool.NumActive() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) ActiveHighWaterMark() int32 { + return p.pool.ActiveHighWaterMark() +} + +// This returns the number of alive idle connections. This method is not part +// of ConnectionPool's API. It is used only for testing. +func (p *connectionPoolImpl) NumIdle() int { + return p.pool.NumIdle() +} + +// BaseConnectionPool can only register a single (network, address) entry. +// Register should be call before any Get calls. +func (p *connectionPoolImpl) Register(network string, address string) error { + return p.pool.Register(network + " " + address) +} + +// BaseConnectionPool has nothing to do on Unregister. +func (p *connectionPoolImpl) Unregister(network string, address string) error { + return nil +} + +func (p *connectionPoolImpl) ListRegistered() []NetworkAddress { + result := make([]NetworkAddress, 0, 1) + for _, location := range p.pool.ListRegistered() { + network, address := parseResourceLocation(location) + + result = append( + result, + NetworkAddress{ + Network: network, + Address: address, + }) + } + return result +} + +// This gets an active connection from the connection pool. Note that network +// and address arguments are ignored (The connections with point to the +// network/address provided by the first Register call). +func (p *connectionPoolImpl) Get( + network string, + address string) (ManagedConn, error) { + + handle, err := p.pool.Get(network + " " + address) + if err != nil { + return nil, err + } + return NewManagedConn(network, address, handle, p, p.options), nil +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) Release(conn ManagedConn) error { + return conn.ReleaseConnection() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) Discard(conn ManagedConn) error { + return conn.DiscardConnection() +} + +// See ConnectionPool for documentation. +func (p *connectionPoolImpl) EnterLameDuckMode() { + p.pool.EnterLameDuckMode() +} diff --git a/weed/wdclient/net2/connection_pool.go b/weed/wdclient/net2/connection_pool.go new file mode 100644 index 000000000..5b8d4d232 --- /dev/null +++ b/weed/wdclient/net2/connection_pool.go @@ -0,0 +1,97 @@ +package net2 + +import ( + "net" + "time" +) + +type ConnectionOptions struct { + // The maximum number of connections that can be active per host at any + // given time (A non-positive value indicates the number of connections + // is unbounded). + MaxActiveConnections int32 + + // The maximum number of idle connections per host that are kept alive by + // the connection pool. + MaxIdleConnections uint32 + + // The maximum amount of time an idle connection can alive (if specified). + MaxIdleTime *time.Duration + + // This limits the number of concurrent Dial calls (there's no limit when + // DialMaxConcurrency is non-positive). + DialMaxConcurrency int + + // Dial specifies the dial function for creating network connections. + // If Dial is nil, net.DialTimeout is used, with timeout set to 1 second. + Dial func(network string, address string) (net.Conn, error) + + // This specifies the now time function. When the function is non-nil, the + // connection pool will use the specified function instead of time.Now to + // generate the current time. + NowFunc func() time.Time + + // This specifies the timeout for any Read() operation. + // Note that setting this to 0 (i.e. not setting it) will make + // read operations block indefinitely. + ReadTimeout time.Duration + + // This specifies the timeout for any Write() operation. + // Note that setting this to 0 (i.e. not setting it) will make + // write operations block indefinitely. + WriteTimeout time.Duration +} + +func (o ConnectionOptions) getCurrentTime() time.Time { + if o.NowFunc == nil { + return time.Now() + } else { + return o.NowFunc() + } +} + +// A generic interface for managed connection pool. All connection pool +// implementations must be threadsafe. +type ConnectionPool interface { + // This returns the number of active connections that are on loan. + NumActive() int32 + + // This returns the highest number of active connections for the entire + // lifetime of the pool. + ActiveHighWaterMark() int32 + + // This returns the number of idle connections that are in the pool. + NumIdle() int + + // This associates (network, address) to the connection pool; afterwhich, + // the user can get connections to (network, address). + Register(network string, address string) error + + // This dissociate (network, address) from the connection pool; + // afterwhich, the user can no longer get connections to + // (network, address). + Unregister(network string, address string) error + + // This returns the list of registered (network, address) entries. + ListRegistered() []NetworkAddress + + // This gets an active connection from the connection pool. The connection + // will remain active until one of the following is called: + // 1. conn.ReleaseConnection() + // 2. conn.DiscardConnection() + // 3. pool.Release(conn) + // 4. pool.Discard(conn) + Get(network string, address string) (ManagedConn, error) + + // This releases an active connection back to the connection pool. + Release(conn ManagedConn) error + + // This discards an active connection from the connection pool. + Discard(conn ManagedConn) error + + // Enter the connection pool into lame duck mode. The connection pool + // will no longer return connections, and all idle connections are closed + // immediately (including active connections that are released back to the + // pool afterward). + EnterLameDuckMode() +} diff --git a/weed/wdclient/net2/doc.go b/weed/wdclient/net2/doc.go new file mode 100644 index 000000000..f4d6552e4 --- /dev/null +++ b/weed/wdclient/net2/doc.go @@ -0,0 +1,6 @@ +// net2 is a collection of functions meant to supplement the capabilities +// provided by the standard "net" package. +package net2 + +// copied from https://github.com/dropbox/godropbox/tree/master/net2 +// removed other dependencies
\ No newline at end of file diff --git a/weed/wdclient/net2/ip.go b/weed/wdclient/net2/ip.go new file mode 100644 index 000000000..60e46342f --- /dev/null +++ b/weed/wdclient/net2/ip.go @@ -0,0 +1,177 @@ +package net2 + +import ( + "fmt" + "log" + "net" + "os" + "strings" + "sync" +) + +var myHostname string +var myHostnameOnce sync.Once + +// Like os.Hostname but caches first successful result, making it cheap to call it +// over and over. +// It will also crash whole process if fetching Hostname fails! +func MyHostname() string { + myHostnameOnce.Do(func() { + var err error + myHostname, err = os.Hostname() + if err != nil { + log.Fatal(err) + } + }) + return myHostname +} + +var myIp4 *net.IPAddr +var myIp4Once sync.Once + +// Resolves `MyHostname()` to an Ip4 address. Caches first successful result, making it +// cheap to call it over and over. +// It will also crash whole process if resolving the IP fails! +func MyIp4() *net.IPAddr { + myIp4Once.Do(func() { + var err error + myIp4, err = net.ResolveIPAddr("ip4", MyHostname()) + if err != nil { + log.Fatal(err) + } + }) + return myIp4 +} + +var myIp6 *net.IPAddr +var myIp6Once sync.Once + +// Resolves `MyHostname()` to an Ip6 address. Caches first successful result, making it +// cheap to call it over and over. +// It will also crash whole process if resolving the IP fails! +func MyIp6() *net.IPAddr { + myIp6Once.Do(func() { + var err error + myIp6, err = net.ResolveIPAddr("ip6", MyHostname()) + if err != nil { + log.Fatal(err) + } + }) + return myIp6 +} + +// This returns the list of local ip addresses which other hosts can connect +// to (NOTE: Loopback ip is ignored). +// Also resolves Hostname to an address and adds it to the list too, so +// IPs from /etc/hosts can work too. +func GetLocalIPs() ([]*net.IP, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("Failed to lookup hostname: %v", err) + } + // Resolves IP Address from Hostname, this way overrides in /etc/hosts + // can work too for IP resolution. + ipInfo, err := net.ResolveIPAddr("ip4", hostname) + if err != nil { + return nil, fmt.Errorf("Failed to resolve ip: %v", err) + } + ips := []*net.IP{&ipInfo.IP} + + // TODO(zviad): Is rest of the code really necessary? + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, fmt.Errorf( "Failed to get interface addresses: %v", err) + } + for _, addr := range addrs { + ipnet, ok := addr.(*net.IPNet) + if !ok { + continue + } + + if ipnet.IP.IsLoopback() { + continue + } + + ips = append(ips, &ipnet.IP) + } + return ips, nil +} + +var localhostIPNets []*net.IPNet + +func init() { + for _, mask := range []string{"127.0.0.1/8", "::1/128"} { + _, ipnet, err := net.ParseCIDR(mask) + if err != nil { + panic(err) + } + localhostIPNets = append(localhostIPNets, ipnet) + } +} + +func IsLocalhostIp(ipStr string) bool { + ip := net.ParseIP(ipStr) + if ip == nil { + return false + } + for _, ipnet := range localhostIPNets { + if ipnet.Contains(ip) { + return true + } + } + return false +} + +// Given a host string, return true if the host is an ip (v4/v6) localhost. +func IsLocalhost(host string) bool { + return IsLocalhostIp(host) || + host == "localhost" || + host == "ip6-localhost" || + host == "ipv6-localhost" +} + +// Resolves hostnames in addresses to actual IP4 addresses. Skips all invalid addresses +// and all addresses that can't be resolved. +// `addrs` are assumed to be of form: ["<hostname>:<port>", ...] +// Returns an error in addition to resolved addresses if not all resolutions succeed. +func ResolveIP4s(addrs []string) ([]string, error) { + resolvedAddrs := make([]string, 0, len(addrs)) + var lastErr error + + for _, server := range addrs { + hostPort := strings.Split(server, ":") + if len(hostPort) != 2 { + lastErr = fmt.Errorf("Skipping invalid address: %s", server) + continue + } + + ip, err := net.ResolveIPAddr("ip4", hostPort[0]) + if err != nil { + lastErr = err + continue + } + resolvedAddrs = append(resolvedAddrs, ip.IP.String()+":"+hostPort[1]) + } + return resolvedAddrs, lastErr +} + +func LookupValidAddrs() (map[string]bool, error) { + hostName, err := os.Hostname() + if err != nil { + return nil, err + } + addrs, err := net.LookupHost(hostName) + if err != nil { + return nil, err + } + validAddrs := make(map[string]bool) + validAddrs[hostName] = true + for _, addr := range addrs { + validAddrs[addr] = true + } + // Special case localhost/127.0.0.1 so that this works on devVMs. It should + // have no affect in production. + validAddrs["127.0.0.1"] = true + validAddrs["localhost"] = true + return validAddrs, nil +} diff --git a/weed/wdclient/net2/managed_connection.go b/weed/wdclient/net2/managed_connection.go new file mode 100644 index 000000000..a886210d1 --- /dev/null +++ b/weed/wdclient/net2/managed_connection.go @@ -0,0 +1,185 @@ +package net2 + +import ( + "fmt" + "net" + "time" + + "errors" + "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool" +) + +// Dial's arguments. +type NetworkAddress struct { + Network string + Address string +} + +// A connection managed by a connection pool. NOTE: SetDeadline, +// SetReadDeadline and SetWriteDeadline are disabled for managed connections. +// (The deadlines are set by the connection pool). +type ManagedConn interface { + net.Conn + + // This returns the original (network, address) entry used for creating + // the connection. + Key() NetworkAddress + + // This returns the underlying net.Conn implementation. + RawConn() net.Conn + + // This returns the connection pool which owns this connection. + Owner() ConnectionPool + + // This indictes a user is done with the connection and releases the + // connection back to the connection pool. + ReleaseConnection() error + + // This indicates the connection is an invalid state, and that the + // connection should be discarded from the connection pool. + DiscardConnection() error +} + +// A physical implementation of ManagedConn +type managedConnImpl struct { + addr NetworkAddress + handle resource_pool.ManagedHandle + pool ConnectionPool + options ConnectionOptions +} + +// This creates a managed connection wrapper. +func NewManagedConn( + network string, + address string, + handle resource_pool.ManagedHandle, + pool ConnectionPool, + options ConnectionOptions) ManagedConn { + + addr := NetworkAddress{ + Network: network, + Address: address, + } + + return &managedConnImpl{ + addr: addr, + handle: handle, + pool: pool, + options: options, + } +} + +func (c *managedConnImpl) rawConn() (net.Conn, error) { + h, err := c.handle.Handle() + return h.(net.Conn), err +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) RawConn() net.Conn { + h, _ := c.handle.Handle() + return h.(net.Conn) +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) Key() NetworkAddress { + return c.addr +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) Owner() ConnectionPool { + return c.pool +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) ReleaseConnection() error { + return c.handle.Release() +} + +// See ManagedConn for documentation. +func (c *managedConnImpl) DiscardConnection() error { + return c.handle.Discard() +} + +// See net.Conn for documentation +func (c *managedConnImpl) Read(b []byte) (n int, err error) { + conn, err := c.rawConn() + if err != nil { + return 0, err + } + + if c.options.ReadTimeout > 0 { + deadline := c.options.getCurrentTime().Add(c.options.ReadTimeout) + _ = conn.SetReadDeadline(deadline) + } + n, err = conn.Read(b) + if err != nil { + var localAddr string + if conn.LocalAddr() != nil { + localAddr = conn.LocalAddr().String() + } else { + localAddr = "(nil)" + } + + var remoteAddr string + if conn.RemoteAddr() != nil { + remoteAddr = conn.RemoteAddr().String() + } else { + remoteAddr = "(nil)" + } + err = fmt.Errorf("Read error from host: %s <-> %s: %v", localAddr, remoteAddr, err) + } + return +} + +// See net.Conn for documentation +func (c *managedConnImpl) Write(b []byte) (n int, err error) { + conn, err := c.rawConn() + if err != nil { + return 0, err + } + + if c.options.WriteTimeout > 0 { + deadline := c.options.getCurrentTime().Add(c.options.WriteTimeout) + _ = conn.SetWriteDeadline(deadline) + } + n, err = conn.Write(b) + if err != nil { + err = fmt.Errorf("Write error: %v", err) + } + return +} + +// See net.Conn for documentation +func (c *managedConnImpl) Close() error { + return c.handle.Discard() +} + +// See net.Conn for documentation +func (c *managedConnImpl) LocalAddr() net.Addr { + conn, _ := c.rawConn() + return conn.LocalAddr() +} + +// See net.Conn for documentation +func (c *managedConnImpl) RemoteAddr() net.Addr { + conn, _ := c.rawConn() + return conn.RemoteAddr() +} + +// SetDeadline is disabled for managed connection (The deadline is set by +// us, with respect to the read/write timeouts specified in ConnectionOptions). +func (c *managedConnImpl) SetDeadline(t time.Time) error { + return errors.New("Cannot set deadline for managed connection") +} + +// SetReadDeadline is disabled for managed connection (The deadline is set by +// us with respect to the read timeout specified in ConnectionOptions). +func (c *managedConnImpl) SetReadDeadline(t time.Time) error { + return errors.New("Cannot set read deadline for managed connection") +} + +// SetWriteDeadline is disabled for managed connection (The deadline is set by +// us with respect to the write timeout specified in ConnectionOptions). +func (c *managedConnImpl) SetWriteDeadline(t time.Time) error { + return errors.New("Cannot set write deadline for managed connection") +} diff --git a/weed/wdclient/net2/port.go b/weed/wdclient/net2/port.go new file mode 100644 index 000000000..f83adba28 --- /dev/null +++ b/weed/wdclient/net2/port.go @@ -0,0 +1,19 @@ +package net2 + +import ( + "net" + "strconv" +) + +// Returns the port information. +func GetPort(addr net.Addr) (int, error) { + _, lport, err := net.SplitHostPort(addr.String()) + if err != nil { + return -1, err + } + lportInt, err := strconv.Atoi(lport) + if err != nil { + return -1, err + } + return lportInt, nil +} diff --git a/weed/wdclient/resource_pool/doc.go b/weed/wdclient/resource_pool/doc.go new file mode 100644 index 000000000..c17b17c6c --- /dev/null +++ b/weed/wdclient/resource_pool/doc.go @@ -0,0 +1,5 @@ +// A generic resource pool for managing resources such as network connections. +package resource_pool + +// copied from https://github.com/dropbox/godropbox/tree/master/resource_pool +// removed other dependencies
\ No newline at end of file diff --git a/weed/wdclient/resource_pool/managed_handle.go b/weed/wdclient/resource_pool/managed_handle.go new file mode 100644 index 000000000..e1d82ca7b --- /dev/null +++ b/weed/wdclient/resource_pool/managed_handle.go @@ -0,0 +1,97 @@ +package resource_pool + +import ( + "sync/atomic" + + "errors" +) + +// A resource handle managed by a resource pool. +type ManagedHandle interface { + // This returns the handle's resource location. + ResourceLocation() string + + // This returns the underlying resource handle (or error if the handle + // is no longer active). + Handle() (interface{}, error) + + // This returns the resource pool which owns this handle. + Owner() ResourcePool + + // The releases the underlying resource handle to the caller and marks the + // managed handle as inactive. The caller is responsible for cleaning up + // the released handle. This returns nil if the managed handle no longer + // owns the resource. + ReleaseUnderlyingHandle() interface{} + + // This indictes a user is done with the handle and releases the handle + // back to the resource pool. + Release() error + + // This indicates the handle is an invalid state, and that the + // connection should be discarded from the connection pool. + Discard() error +} + +// A physical implementation of ManagedHandle +type managedHandleImpl struct { + location string + handle interface{} + pool ResourcePool + isActive int32 // atomic bool + options Options +} + +// This creates a managed handle wrapper. +func NewManagedHandle( + resourceLocation string, + handle interface{}, + pool ResourcePool, + options Options) ManagedHandle { + + h := &managedHandleImpl{ + location: resourceLocation, + handle: handle, + pool: pool, + options: options, + } + atomic.StoreInt32(&h.isActive, 1) + + return h +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) ResourceLocation() string { + return c.location +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Handle() (interface{}, error) { + if atomic.LoadInt32(&c.isActive) == 0 { + return c.handle, errors.New("Resource handle is no longer valid") + } + return c.handle, nil +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Owner() ResourcePool { + return c.pool +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) ReleaseUnderlyingHandle() interface{} { + if atomic.CompareAndSwapInt32(&c.isActive, 1, 0) { + return c.handle + } + return nil +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Release() error { + return c.pool.Release(c) +} + +// See ManagedHandle for documentation. +func (c *managedHandleImpl) Discard() error { + return c.pool.Discard(c) +} diff --git a/weed/wdclient/resource_pool/multi_resource_pool.go b/weed/wdclient/resource_pool/multi_resource_pool.go new file mode 100644 index 000000000..9ac25526d --- /dev/null +++ b/weed/wdclient/resource_pool/multi_resource_pool.go @@ -0,0 +1,200 @@ +package resource_pool + +import ( + "fmt" + "sync" + + "errors" +) + +// A resource pool implementation that manages multiple resource location +// entries. The handles to each resource location entry acts independently. +// For example "tcp localhost:11211" could act as memcache +// shard 0 and "tcp localhost:11212" could act as memcache shard 1. +type multiResourcePool struct { + options Options + + createPool func(Options) ResourcePool + + rwMutex sync.RWMutex + isLameDuck bool // guarded by rwMutex + // NOTE: the locationPools is guarded by rwMutex, but the pool entries + // are not. + locationPools map[string]ResourcePool +} + +// This returns a MultiResourcePool, which manages multiple +// resource location entries. The handles to each resource location +// entry acts independently. +// +// When createPool is nil, NewSimpleResourcePool is used as default. +func NewMultiResourcePool( + options Options, + createPool func(Options) ResourcePool) ResourcePool { + + if createPool == nil { + createPool = NewSimpleResourcePool + } + + return &multiResourcePool{ + options: options, + createPool: createPool, + rwMutex: sync.RWMutex{}, + isLameDuck: false, + locationPools: make(map[string]ResourcePool), + } +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) NumActive() int32 { + total := int32(0) + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + total += pool.NumActive() + } + return total +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) ActiveHighWaterMark() int32 { + high := int32(0) + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + val := pool.ActiveHighWaterMark() + if val > high { + high = val + } + } + return high +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) NumIdle() int { + total := 0 + + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + for _, pool := range p.locationPools { + total += pool.NumIdle() + } + return total +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Register(resourceLocation string) error { + if resourceLocation == "" { + return errors.New("Registering invalid resource location") + } + + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + if p.isLameDuck { + return fmt.Errorf( + "Cannot register %s to lame duck resource pool", + resourceLocation) + } + + if _, inMap := p.locationPools[resourceLocation]; inMap { + return nil + } + + pool := p.createPool(p.options) + if err := pool.Register(resourceLocation); err != nil { + return err + } + + p.locationPools[resourceLocation] = pool + return nil +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Unregister(resourceLocation string) error { + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + if pool, inMap := p.locationPools[resourceLocation]; inMap { + _ = pool.Unregister("") + pool.EnterLameDuckMode() + delete(p.locationPools, resourceLocation) + } + return nil +} + +func (p *multiResourcePool) ListRegistered() []string { + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + result := make([]string, 0, len(p.locationPools)) + for key, _ := range p.locationPools { + result = append(result, key) + } + + return result +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Get( + resourceLocation string) (ManagedHandle, error) { + + pool := p.getPool(resourceLocation) + if pool == nil { + return nil, fmt.Errorf( + "%s is not registered in the resource pool", + resourceLocation) + } + return pool.Get(resourceLocation) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Release(handle ManagedHandle) error { + pool := p.getPool(handle.ResourceLocation()) + if pool == nil { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + return pool.Release(handle) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) Discard(handle ManagedHandle) error { + pool := p.getPool(handle.ResourceLocation()) + if pool == nil { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + return pool.Discard(handle) +} + +// See ResourcePool for documentation. +func (p *multiResourcePool) EnterLameDuckMode() { + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + + p.isLameDuck = true + + for _, pool := range p.locationPools { + pool.EnterLameDuckMode() + } +} + +func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool { + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() + + if pool, inMap := p.locationPools[resourceLocation]; inMap { + return pool + } + return nil +} diff --git a/weed/wdclient/resource_pool/resource_pool.go b/weed/wdclient/resource_pool/resource_pool.go new file mode 100644 index 000000000..26c433f50 --- /dev/null +++ b/weed/wdclient/resource_pool/resource_pool.go @@ -0,0 +1,96 @@ +package resource_pool + +import ( + "time" +) + +type Options struct { + // The maximum number of active resource handles per resource location. (A + // non-positive value indicates the number of active resource handles is + // unbounded). + MaxActiveHandles int32 + + // The maximum number of idle resource handles per resource location that + // are kept alive by the resource pool. + MaxIdleHandles uint32 + + // The maximum amount of time an idle resource handle can remain alive (if + // specified). + MaxIdleTime *time.Duration + + // This limits the number of concurrent Open calls (there's no limit when + // OpenMaxConcurrency is non-positive). + OpenMaxConcurrency int + + // This function creates a resource handle (e.g., a connection) for a + // resource location. The function must be thread-safe. + Open func(resourceLocation string) ( + handle interface{}, + err error) + + // This function destroys a resource handle and performs the necessary + // cleanup to free up resources. The function must be thread-safe. + Close func(handle interface{}) error + + // This specifies the now time function. When the function is non-nil, the + // resource pool will use the specified function instead of time.Now to + // generate the current time. + NowFunc func() time.Time +} + +func (o Options) getCurrentTime() time.Time { + if o.NowFunc == nil { + return time.Now() + } else { + return o.NowFunc() + } +} + +// A generic interface for managed resource pool. All resource pool +// implementations must be threadsafe. +type ResourcePool interface { + // This returns the number of active resource handles. + NumActive() int32 + + // This returns the highest number of actives handles for the entire + // lifetime of the pool. If the pool contains multiple sub-pools, the + // high water mark is the max of the sub-pools' high water marks. + ActiveHighWaterMark() int32 + + // This returns the number of alive idle handles. NOTE: This is only used + // for testing. + NumIdle() int + + // This associates a resource location to the resource pool; afterwhich, + // the user can get resource handles for the resource location. + Register(resourceLocation string) error + + // This dissociates a resource location from the resource pool; afterwhich, + // the user can no longer get resource handles for the resource location. + // If the given resource location corresponds to a sub-pool, the unregistered + // sub-pool will enter lame duck mode. + Unregister(resourceLocation string) error + + // This returns the list of registered resource location entries. + ListRegistered() []string + + // This gets an active resource handle from the resource pool. The + // handle will remain active until one of the following is called: + // 1. handle.Release() + // 2. handle.Discard() + // 3. pool.Release(handle) + // 4. pool.Discard(handle) + Get(key string) (ManagedHandle, error) + + // This releases an active resource handle back to the resource pool. + Release(handle ManagedHandle) error + + // This discards an active resource from the resource pool. + Discard(handle ManagedHandle) error + + // Enter the resource pool into lame duck mode. The resource pool + // will no longer return resource handles, and all idle resource handles + // are closed immediately (including active resource handles that are + // released back to the pool afterward). + EnterLameDuckMode() +} diff --git a/weed/wdclient/resource_pool/semaphore.go b/weed/wdclient/resource_pool/semaphore.go new file mode 100644 index 000000000..ff35d5bc5 --- /dev/null +++ b/weed/wdclient/resource_pool/semaphore.go @@ -0,0 +1,154 @@ +package resource_pool + +import ( + "fmt" + "sync" + "sync/atomic" + "time" +) + +type Semaphore interface { + // Increment the semaphore counter by one. + Release() + + // Decrement the semaphore counter by one, and block if counter < 0 + Acquire() + + // Decrement the semaphore counter by one, and block if counter < 0 + // Wait for up to the given duration. Returns true if did not timeout + TryAcquire(timeout time.Duration) bool +} + +// A simple counting Semaphore. +type boundedSemaphore struct { + slots chan struct{} +} + +// Create a bounded semaphore. The count parameter must be a positive number. +// NOTE: The bounded semaphore will panic if the user tries to Release +// beyond the specified count. +func NewBoundedSemaphore(count uint) Semaphore { + sem := &boundedSemaphore{ + slots: make(chan struct{}, int(count)), + } + for i := 0; i < cap(sem.slots); i++ { + sem.slots <- struct{}{} + } + return sem +} + +// Acquire returns on successful acquisition. +func (sem *boundedSemaphore) Acquire() { + <-sem.slots +} + +// TryAcquire returns true if it acquires a resource slot within the +// timeout, false otherwise. +func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool { + if timeout > 0 { + // Wait until we get a slot or timeout expires. + tm := time.NewTimer(timeout) + defer tm.Stop() + select { + case <-sem.slots: + return true + case <-tm.C: + // Timeout expired. In very rare cases this might happen even if + // there is a slot available, e.g. GC pause after we create the timer + // and select randomly picked this one out of the two available channels. + // We should do one final immediate check below. + } + } + + // Return true if we have a slot available immediately and false otherwise. + select { + case <-sem.slots: + return true + default: + return false + } +} + +// Release the acquired semaphore. You must not release more than you +// have acquired. +func (sem *boundedSemaphore) Release() { + select { + case sem.slots <- struct{}{}: + default: + // slots is buffered. If a send blocks, it indicates a programming + // error. + panic(fmt.Errorf("too many releases for boundedSemaphore")) + } +} + +// This returns an unbound counting semaphore with the specified initial count. +// The semaphore counter can be arbitrary large (i.e., Release can be called +// unlimited amount of times). +// +// NOTE: In general, users should use bounded semaphore since it is more +// efficient than unbounded semaphore. +func NewUnboundedSemaphore(initialCount int) Semaphore { + res := &unboundedSemaphore{ + counter: int64(initialCount), + } + res.cond.L = &res.lock + return res +} + +type unboundedSemaphore struct { + lock sync.Mutex + cond sync.Cond + counter int64 +} + +func (s *unboundedSemaphore) Release() { + s.lock.Lock() + s.counter += 1 + if s.counter > 0 { + // Not broadcasting here since it's unlike we can satify all waiting + // goroutines. Instead, we will Signal again if there are left over + // quota after Acquire, in case of lost wakeups. + s.cond.Signal() + } + s.lock.Unlock() +} + +func (s *unboundedSemaphore) Acquire() { + s.lock.Lock() + for s.counter < 1 { + s.cond.Wait() + } + s.counter -= 1 + if s.counter > 0 { + s.cond.Signal() + } + s.lock.Unlock() +} + +func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool { + done := make(chan bool, 1) + // Gate used to communicate between the threads and decide what the result + // is. If the main thread decides, we have timed out, otherwise we succeed. + decided := new(int32) + atomic.StoreInt32(decided, 0) + go func() { + s.Acquire() + if atomic.SwapInt32(decided, 1) == 0 { + // Acquire won the race + done <- true + } else { + // If we already decided the result, and this thread did not win + s.Release() + } + }() + select { + case <-done: + return true + case <-time.After(timeout): + if atomic.SwapInt32(decided, 1) == 1 { + // The other thread already decided the result + return true + } + return false + } +} diff --git a/weed/wdclient/resource_pool/simple_resource_pool.go b/weed/wdclient/resource_pool/simple_resource_pool.go new file mode 100644 index 000000000..b0c539100 --- /dev/null +++ b/weed/wdclient/resource_pool/simple_resource_pool.go @@ -0,0 +1,343 @@ +package resource_pool + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" +) + +type idleHandle struct { + handle interface{} + keepUntil *time.Time +} + +type TooManyHandles struct { + location string +} + +func (t TooManyHandles) Error() string { + return fmt.Sprintf("Too many handles to %s", t.location) +} + +type OpenHandleError struct { + location string + err error +} + +func (o OpenHandleError) Error() string { + return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err) +} + +// A resource pool implementation where all handles are associated to the +// same resource location. +type simpleResourcePool struct { + options Options + + numActive *int32 // atomic counter + + activeHighWaterMark *int32 // atomic / monotonically increasing value + + openTokens Semaphore + + mutex sync.Mutex + location string // guard by mutex + idleHandles []*idleHandle // guarded by mutex + isLameDuck bool // guarded by mutex +} + +// This returns a SimpleResourcePool, where all handles are associated to a +// single resource location. +func NewSimpleResourcePool(options Options) ResourcePool { + numActive := new(int32) + atomic.StoreInt32(numActive, 0) + + activeHighWaterMark := new(int32) + atomic.StoreInt32(activeHighWaterMark, 0) + + var tokens Semaphore + if options.OpenMaxConcurrency > 0 { + tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency)) + } + + return &simpleResourcePool{ + location: "", + options: options, + numActive: numActive, + activeHighWaterMark: activeHighWaterMark, + openTokens: tokens, + mutex: sync.Mutex{}, + idleHandles: make([]*idleHandle, 0, 0), + isLameDuck: false, + } +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) NumActive() int32 { + return atomic.LoadInt32(p.numActive) +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) ActiveHighWaterMark() int32 { + return atomic.LoadInt32(p.activeHighWaterMark) +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) NumIdle() int { + p.mutex.Lock() + defer p.mutex.Unlock() + return len(p.idleHandles) +} + +// SimpleResourcePool can only register a single (network, address) entry. +// Register should be call before any Get calls. +func (p *simpleResourcePool) Register(resourceLocation string) error { + if resourceLocation == "" { + return errors.New("Invalid resource location") + } + + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.isLameDuck { + return fmt.Errorf( + "cannot register %s to lame duck resource pool", + resourceLocation) + } + + if p.location == "" { + p.location = resourceLocation + return nil + } + return errors.New("SimpleResourcePool can only register one location") +} + +// SimpleResourcePool will enter lame duck mode upon calling Unregister. +func (p *simpleResourcePool) Unregister(resourceLocation string) error { + p.EnterLameDuckMode() + return nil +} + +func (p *simpleResourcePool) ListRegistered() []string { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.location != "" { + return []string{p.location} + } + return []string{} +} + +func (p *simpleResourcePool) getLocation() (string, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.location == "" { + return "", fmt.Errorf( + "resource location is not set for SimpleResourcePool") + } + + if p.isLameDuck { + return "", fmt.Errorf( + "lame duck resource pool cannot return handles to %s", + p.location) + } + + return p.location, nil +} + +// This gets an active resource from the resource pool. Note that the +// resourceLocation argument is ignored (The handles are associated to the +// resource location provided by the first Register call). +func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) { + activeCount := atomic.AddInt32(p.numActive, 1) + if p.options.MaxActiveHandles > 0 && + activeCount > p.options.MaxActiveHandles { + + atomic.AddInt32(p.numActive, -1) + return nil, TooManyHandles{p.location} + } + + highest := atomic.LoadInt32(p.activeHighWaterMark) + for activeCount > highest && + !atomic.CompareAndSwapInt32( + p.activeHighWaterMark, + highest, + activeCount) { + + highest = atomic.LoadInt32(p.activeHighWaterMark) + } + + if h := p.getIdleHandle(); h != nil { + return h, nil + } + + location, err := p.getLocation() + if err != nil { + atomic.AddInt32(p.numActive, -1) + return nil, err + } + + if p.openTokens != nil { + // Current implementation does not wait for tokens to become available. + // If that causes availability hits, we could increase the wait, + // similar to simple_pool.go. + if p.openTokens.TryAcquire(0) { + defer p.openTokens.Release() + } else { + // We could not immediately acquire a token. + // Instead of waiting + atomic.AddInt32(p.numActive, -1) + return nil, OpenHandleError{ + p.location, errors.New("Open Error: reached OpenMaxConcurrency")} + } + } + + handle, err := p.options.Open(location) + if err != nil { + atomic.AddInt32(p.numActive, -1) + return nil, OpenHandleError{p.location, err} + } + + return NewManagedHandle(p.location, handle, p, p.options), nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) Release(handle ManagedHandle) error { + if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + h := handle.ReleaseUnderlyingHandle() + if h != nil { + // We can unref either before or after queuing the idle handle. + // The advantage of unref-ing before queuing is that there is + // a higher chance of successful Get when number of active handles + // is close to the limit (but potentially more handle creation). + // The advantage of queuing before unref-ing is that there's a + // higher chance of reusing handle (but potentially more Get failures). + atomic.AddInt32(p.numActive, -1) + p.queueIdleHandles(h) + } + + return nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) Discard(handle ManagedHandle) error { + if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { + return errors.New( + "Resource pool cannot take control of a handle owned " + + "by another resource pool") + } + + h := handle.ReleaseUnderlyingHandle() + if h != nil { + atomic.AddInt32(p.numActive, -1) + if err := p.options.Close(h); err != nil { + return fmt.Errorf("failed to close resource handle: %v", err) + } + } + return nil +} + +// See ResourcePool for documentation. +func (p *simpleResourcePool) EnterLameDuckMode() { + p.mutex.Lock() + + toClose := p.idleHandles + p.isLameDuck = true + p.idleHandles = []*idleHandle{} + + p.mutex.Unlock() + + p.closeHandles(toClose) +} + +// This returns an idle resource, if there is one. +func (p *simpleResourcePool) getIdleHandle() ManagedHandle { + var toClose []*idleHandle + defer func() { + // NOTE: Must keep the closure around to late bind the toClose slice. + p.closeHandles(toClose) + }() + + now := p.options.getCurrentTime() + + p.mutex.Lock() + defer p.mutex.Unlock() + + var i int + for i = 0; i < len(p.idleHandles); i++ { + idle := p.idleHandles[i] + if idle.keepUntil == nil || now.Before(*idle.keepUntil) { + break + } + } + if i > 0 { + toClose = p.idleHandles[0:i] + } + + if i < len(p.idleHandles) { + idle := p.idleHandles[i] + p.idleHandles = p.idleHandles[i+1:] + return NewManagedHandle(p.location, idle.handle, p, p.options) + } + + if len(p.idleHandles) > 0 { + p.idleHandles = []*idleHandle{} + } + return nil +} + +// This adds an idle resource to the pool. +func (p *simpleResourcePool) queueIdleHandles(handle interface{}) { + var toClose []*idleHandle + defer func() { + // NOTE: Must keep the closure around to late bind the toClose slice. + p.closeHandles(toClose) + }() + + now := p.options.getCurrentTime() + var keepUntil *time.Time + if p.options.MaxIdleTime != nil { + // NOTE: Assign to temp variable first to work around compiler bug + x := now.Add(*p.options.MaxIdleTime) + keepUntil = &x + } + + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.isLameDuck { + toClose = []*idleHandle{ + {handle: handle}, + } + return + } + + p.idleHandles = append( + p.idleHandles, + &idleHandle{ + handle: handle, + keepUntil: keepUntil, + }) + + nIdleHandles := uint32(len(p.idleHandles)) + if nIdleHandles > p.options.MaxIdleHandles { + handlesToClose := nIdleHandles - p.options.MaxIdleHandles + toClose = p.idleHandles[0:handlesToClose] + p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles] + } +} + +// Closes resources, at this point it is assumed that this resources +// are no longer referenced from the main idleHandles slice. +func (p *simpleResourcePool) closeHandles(handles []*idleHandle) { + for _, handle := range handles { + _ = p.options.Close(handle.handle) + } +} diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go new file mode 100644 index 000000000..afebd71eb --- /dev/null +++ b/weed/wdclient/volume_tcp_client.go @@ -0,0 +1,97 @@ +package wdclient + +import ( + "bufio" + "bytes" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient/net2" + "io" + "net" + "time" +) + +// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication +type VolumeTcpClient struct { + cp net2.ConnectionPool +} + +type VolumeTcpConn struct { + net.Conn + bufWriter *bufio.Writer + bufReader *bufio.Reader +} + +func NewVolumeTcpClient() *VolumeTcpClient { + MaxIdleTime := 10 * time.Second + return &VolumeTcpClient{ + cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{ + MaxActiveConnections: 16, + MaxIdleConnections: 1, + MaxIdleTime: &MaxIdleTime, + DialMaxConcurrency: 0, + Dial: func(network string, address string) (net.Conn, error) { + conn, err := net.Dial(network, address) + return &VolumeTcpConn{ + conn, + bufio.NewWriter(conn), + bufio.NewReader(conn), + }, err + }, + NowFunc: nil, + ReadTimeout: 0, + WriteTimeout: 0, + }), + } +} +func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) { + + tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000) + if parseErr != nil { + return parseErr + } + + c.cp.Register("tcp", tcpAddress) + tcpConn, getErr := c.cp.Get("tcp", tcpAddress) + if getErr != nil { + return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr) + } + conn := tcpConn.RawConn().(*VolumeTcpConn) + defer func() { + if err != nil { + tcpConn.DiscardConnection() + } else { + tcpConn.ReleaseConnection() + } + }() + + buf := []byte("+" + fileId + "\n") + _, err = conn.bufWriter.Write([]byte(buf)) + if err != nil { + return + } + util.Uint32toBytes(buf[0:4], fileSize) + _, err = conn.bufWriter.Write(buf[0:4]) + if err != nil { + return + } + _, err = io.Copy(conn.bufWriter, fileReader) + if err != nil { + return + } + conn.bufWriter.Write([]byte("!\n")) + conn.bufWriter.Flush() + + ret, _, err := conn.bufReader.ReadLine() + if err != nil { + glog.V(0).Infof("upload by tcp: %v", err) + return + } + if !bytes.HasPrefix(ret, []byte("+OK")) { + glog.V(0).Infof("upload by tcp: %v", string(ret)) + } + + return nil +} |
