aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/Makefile4
-rw-r--r--weed/command/benchmark.go25
-rw-r--r--weed/command/command.go2
-rw-r--r--weed/command/filer_backup.go157
-rw-r--r--weed/command/filer_meta_backup.go268
-rw-r--r--weed/command/filer_meta_tail.go8
-rw-r--r--weed/command/filer_replication.go29
-rw-r--r--weed/command/filer_sync.go203
-rw-r--r--weed/command/master.go1
-rw-r--r--weed/command/mount_std.go23
-rw-r--r--weed/command/msg_broker.go2
-rw-r--r--weed/command/s3.go2
-rw-r--r--weed/command/scaffold.go15
-rw-r--r--weed/command/server.go13
-rw-r--r--weed/command/volume.go26
-rw-r--r--weed/command/webdav.go2
-rw-r--r--weed/filer/filer_delete_entry.go6
-rw-r--r--weed/filer/filer_on_meta_event.go5
-rw-r--r--weed/filer/meta_aggregator.go1
-rw-r--r--weed/filer/stream.go8
-rw-r--r--weed/filesys/dir.go14
-rw-r--r--weed/filesys/dir_link.go29
-rw-r--r--weed/filesys/dirty_page.go9
-rw-r--r--weed/filesys/file.go51
-rw-r--r--weed/filesys/filehandle.go75
-rw-r--r--weed/filesys/fscache.go5
-rw-r--r--weed/operation/upload_content.go2
-rw-r--r--weed/pb/filer_pb/filer_client.go14
-rw-r--r--weed/pb/grpc_client_server.go65
-rw-r--r--weed/pb/volume_info.go29
-rw-r--r--weed/replication/repl_util/replication_util.go (renamed from weed/replication/repl_util/replication_utli.go)2
-rw-r--r--weed/replication/replicator.go2
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go14
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go14
-rw-r--r--weed/replication/sink/filersink/filer_sink.go6
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go14
-rw-r--r--weed/replication/sink/localsink/local_sink.go8
-rw-r--r--weed/replication/sink/replication_sink.go1
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go29
-rw-r--r--weed/replication/sink/s3sink/s3_write.go13
-rw-r--r--weed/replication/source/filer_source.go8
-rw-r--r--weed/s3api/filer_util.go4
-rw-r--r--weed/s3api/s3api_bucket_handlers.go2
-rw-r--r--weed/s3api/s3api_object_handlers.go20
-rw-r--r--weed/s3api/s3api_object_handlers_postpolicy.go2
-rw-r--r--weed/security/tls.go66
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_read.go11
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go11
-rw-r--r--weed/server/master_grpc_server_volume.go7
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go137
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_s3_clean_uploads.go92
-rw-r--r--weed/shell/command_volume_tier_move.go2
-rw-r--r--weed/storage/backend/backend.go6
-rw-r--r--weed/storage/backend/disk_file.go2
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go5
-rw-r--r--weed/storage/erasure_coding/ec_volume.go2
-rw-r--r--weed/storage/needle/crc.go23
-rw-r--r--weed/storage/needle/needle_read_write.go2
-rw-r--r--weed/storage/needle_map_leveldb.go6
-rw-r--r--weed/storage/needle_map_sorted_file.go8
-rw-r--r--weed/storage/store.go18
-rw-r--r--weed/storage/super_block/replica_placement.go3
-rw-r--r--weed/storage/volume_loading.go10
-rw-r--r--weed/storage/volume_read_write.go60
-rw-r--r--weed/storage/volume_stream_write.go104
-rw-r--r--weed/storage/volume_tier.go13
-rw-r--r--weed/storage/volume_vacuum.go2
-rw-r--r--weed/topology/data_node_ec.go17
-rw-r--r--weed/topology/volume_layout.go2
-rw-r--r--weed/topology/volume_location_list.go2
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/fasthttp_util.go1
-rw-r--r--weed/util/fla9/fla9.go5
-rw-r--r--weed/wdclient/net2/base_connection_pool.go159
-rw-r--r--weed/wdclient/net2/connection_pool.go97
-rw-r--r--weed/wdclient/net2/doc.go6
-rw-r--r--weed/wdclient/net2/ip.go177
-rw-r--r--weed/wdclient/net2/managed_connection.go185
-rw-r--r--weed/wdclient/net2/port.go19
-rw-r--r--weed/wdclient/resource_pool/doc.go5
-rw-r--r--weed/wdclient/resource_pool/managed_handle.go97
-rw-r--r--weed/wdclient/resource_pool/multi_resource_pool.go200
-rw-r--r--weed/wdclient/resource_pool/resource_pool.go96
-rw-r--r--weed/wdclient/resource_pool/semaphore.go154
-rw-r--r--weed/wdclient/resource_pool/simple_resource_pool.go343
-rw-r--r--weed/wdclient/volume_tcp_client.go97
88 files changed, 3091 insertions, 399 deletions
diff --git a/weed/Makefile b/weed/Makefile
index fd0843c22..8f1257d09 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -33,3 +33,7 @@ debug_webdav:
debug_s3:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
+
+debug_filer_copy:
+ go build -gcflags="all=-N -l"
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index c1bc80c42..af0793c70 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
fsync *bool
+ useTcp *bool
}
var (
@@ -67,6 +68,7 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ volumeTcpClient := wdclient.NewVolumeTcpClient()
+
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
@@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
+ if *b.useTcp {
+ if uploadByTcp(volumeTcpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
+ } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
+func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
+
+ err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
diff --git a/weed/command/command.go b/weed/command/command.go
index bbc2e0423..a9063eaa0 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,7 +15,9 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerBackup,
cmdFilerCat,
+ cmdFilerMetaBackup,
cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
new file mode 100644
index 000000000..888b46fe7
--- /dev/null
+++ b/weed/command/filer_backup.go
@@ -0,0 +1,157 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+type FilerBackupOptions struct {
+ isActivePassive *bool
+ filer *string
+ path *string
+ debug *bool
+ proxyByFiler *bool
+ timeAgo *time.Duration
+}
+
+var (
+ filerBackupOptions FilerBackupOptions
+)
+
+func init() {
+ cmdFilerBackup.Run = runFilerBackup // break init cycle
+ filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
+ filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
+ filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
+ filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
+ filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerBackup = &Command{
+ UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
+ Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
+ Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
+
+ filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
+
+ If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
+ A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
+
+`,
+}
+
+func runFilerBackup(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("replication", true)
+
+ for {
+ err := doFilerBackup(grpcDialOption, &filerBackupOptions)
+ if err != nil {
+ glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+const (
+ BackupKeyPrefix = "backup."
+)
+
+func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
+
+ // find data sink
+ config := util.GetViper()
+ dataSink := findSink(config)
+ if dataSink == nil {
+ return fmt.Errorf("no data sink configured in replication.toml")
+ }
+
+ sourceFiler := *backupOption.filer
+ sourcePath := *backupOption.path
+ timeAgo := *backupOption.timeAgo
+ targetPath := dataSink.GetSinkToDirectory()
+ debug := *backupOption.debug
+
+ // get start time for the data sink
+ startFrom := time.Unix(0, 0)
+ sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
+ if timeAgo.Milliseconds() == 0 {
+ lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
+ if err != nil {
+ glog.V(0).Infof("starting from %v", startFrom)
+ } else {
+ startFrom = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resuming from %v", startFrom)
+ }
+ } else {
+ startFrom = time.Now().Add(-timeAgo)
+ glog.V(0).Infof("start time is set to %v", startFrom)
+ }
+
+ // create filer sink
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
+ dataSink.SetSourceFiler(filerSource)
+
+ processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
+
+ return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "backup_" + dataSink.GetName(),
+ PathPrefix: sourcePath,
+ SinceNs: startFrom.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("processEventFn: %v", err)
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
+ return fmt.Errorf("setOffset: %v", err)
+ }
+ }
+
+ }
+
+ })
+
+}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
new file mode 100644
index 000000000..ba0b44659
--- /dev/null
+++ b/weed/command/filer_meta_backup.go
@@ -0,0 +1,268 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+ "io"
+ "reflect"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ metaBackup FilerMetaBackupOptions
+)
+
+type FilerMetaBackupOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress *string
+ filerDirectory *string
+ restart *bool
+ backupFilerConfig *string
+
+ store filer.FilerStore
+}
+
+func init() {
+ cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
+ metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
+ metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
+ metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
+}
+
+var cmdFilerMetaBackup = &Command{
+ UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
+ Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
+ Long: `continuously backup filer meta data changes.
+The backup writes to another filer store specified in a backup_filer.toml.
+
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
+
+ `,
+}
+
+func runFilerMetaBackup(cmd *Command, args []string) bool {
+
+ metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ // load backup_filer.toml
+ v := viper.New()
+ v.SetConfigFile(*metaBackup.backupFilerConfig)
+
+ if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
+ glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
+ " weed scaffold -config=%s -output=.\n\n\n",
+ *metaBackup.backupFilerConfig, "backup_filer", "filer")
+ }
+
+ if err := metaBackup.initStore(v); err != nil {
+ glog.V(0).Infof("init backup filer store: %v", err)
+ return true
+ }
+
+ missingPreviousBackup := false
+ _, err := metaBackup.getOffset()
+ if err != nil {
+ missingPreviousBackup = true
+ }
+
+ if *metaBackup.restart || missingPreviousBackup {
+ glog.V(0).Infof("traversing metadata tree...")
+ startTime := time.Now()
+ if err := metaBackup.traverseMetadata(); err != nil {
+ glog.Errorf("traverse meta data: %v", err)
+ return true
+ }
+ glog.V(0).Infof("metadata copied up to %v", startTime)
+ if err := metaBackup.setOffset(startTime); err != nil {
+ startTime = time.Now()
+ }
+ }
+
+ for {
+ err := metaBackup.streamMetadataBackup()
+ if err != nil {
+ glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
+ // load configuration for default filer store
+ hasDefaultStoreConfigured := false
+ for _, store := range filer.Stores {
+ if v.GetBool(store.GetName() + ".enabled") {
+ store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
+ if err := store.Initialize(v, store.GetName()+"."); err != nil {
+ glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
+ }
+ glog.V(0).Infof("configured filer store to %s", store.GetName())
+ hasDefaultStoreConfigured = true
+ metaBackup.store = filer.NewFilerStoreWrapper(store)
+ break
+ }
+ }
+ if !hasDefaultStoreConfigured {
+ return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
+ }
+
+ return nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
+ var saveErr error
+
+ traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
+
+ println("+", parentPath.Child(entry.Name))
+ if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
+ saveErr = fmt.Errorf("insert entry error: %v\n", err)
+ return
+ }
+
+ })
+
+ if traverseErr != nil {
+ return fmt.Errorf("traverse: %v", traverseErr)
+ }
+ return saveErr
+}
+
+var (
+ MetaBackupKey = []byte("metaBackup")
+)
+
+func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
+
+ startTime, err := metaBackup.getOffset()
+ if err != nil {
+ startTime = time.Now()
+ }
+ glog.V(0).Infof("streaming from %v", startTime)
+
+ store := metaBackup.store
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+
+ ctx := context.Background()
+ message := resp.EventNotification
+
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.InsertEntry(ctx, entry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.UpdateEntry(ctx, entry)
+ }
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
+ return err
+ }
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
+ }
+
+ return nil
+ }
+
+ tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "meta_backup",
+ PathPrefix: *metaBackup.filerDirectory,
+ SinceNs: startTime.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
+ return err2
+ }
+ }
+
+ }
+
+ })
+ return tailErr
+}
+
+func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
+ value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
+ if err != nil {
+ return
+ }
+ tsNs := util.BytesToUint64(value)
+
+ return time.Unix(0, int64(tsNs)), nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
+
+ if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
+ return err
+ }
+ return nil
+}
+
+var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
+
+func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+
+}
+
+func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index f055b19a8..8451ffd78 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -23,9 +23,9 @@ func init() {
}
var cmdFilerMetaTail = &Command{
- UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
- Short: "see recent changes on a filer",
- Long: `See recent changes on a filer.
+ UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
+ Short: "see continuous changes on a filer",
+ Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
@@ -36,7 +36,7 @@ var cmdFilerMetaTail = &Command{
var (
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
- tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index e8c06b208..885c95540 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
- var dataSink sink.ReplicationSink
- for _, sk := range sink.Sinks {
- if config.GetBool("sink." + sk.GetName() + ".enabled") {
- if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize sink for %s: %+v",
- sk.GetName(), err)
- }
- glog.V(0).Infof("Configure sink to %s", sk.GetName())
- dataSink = sk
- break
- }
- }
+ dataSink := findSink(config)
if dataSink == nil {
println("no data sink configured in replication.toml:")
@@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
+func findSink(config *util.ViperProxy) sink.ReplicationSink {
+ var dataSink sink.ReplicationSink
+ for _, sk := range sink.Sinks {
+ if config.GetBool("sink." + sk.GetName() + ".enabled") {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
+ glog.Fatalf("Failed to initialize sink for %s: %+v",
+ sk.GetName(), err)
+ }
+ glog.V(0).Infof("Configure sink to %s", sk.GetName())
+ dataSink = sk
+ break
+ }
+ }
+ return dataSink
+}
+
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index f8beb6fda..0f34e5701 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// if first time, start from now
// if has previously synced, resume from that point of time
- sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
+ sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
if err != nil {
return err
}
@@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
+ persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
+
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
-
- var sourceOldKey, sourceNewKey util.FullPath
- if message.OldEntry != nil {
- sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
- }
- if message.NewEntry != nil {
- sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
- }
-
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
- if debug {
- fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
- }
-
- if !strings.HasPrefix(resp.Directory, sourcePath) {
- return nil
- }
-
- // handle deletions
- if message.OldEntry != nil && message.NewEntry == nil {
- if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
-
- // handle new entries
- if message.OldEntry == nil && message.NewEntry != nil {
- if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- }
-
- // this is something special?
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
-
- // handle updates
- if strings.HasPrefix(string(sourceOldKey), sourcePath) {
- // old key is in the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is also in the watched directory
- oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
- foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
- if foundExisting {
- return err
- }
-
- // not able to find old entry
- if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
- return fmt.Errorf("delete old entry %v: %v", oldKey, err)
- }
-
- // create the new entry
- newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
-
- } else {
- // new key is outside of the watched directory
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
- } else {
- // old key is outside of the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is in the watched directory
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- } else {
- // new key is also outside of the watched directory
- // skip
- }
- }
-
- return nil
+ return persistEventFn(resp)
}
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
- if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
+ if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err
}
}
@@ -290,11 +215,11 @@ const (
SyncKeyPrefix = "sync."
)
-func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
+func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
@@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
}
-func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
+func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
@@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
})
}
+
+func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
+ // process function
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ var sourceOldKey, sourceNewKey util.FullPath
+ if message.OldEntry != nil {
+ sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
+ }
+ if message.NewEntry != nil {
+ sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
+ }
+
+ if debug {
+ glog.V(0).Infof("received %v", resp)
+ }
+
+ if !strings.HasPrefix(resp.Directory, sourcePath) {
+ return nil
+ }
+
+ // handle deletions
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+
+ // handle new entries
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ }
+
+ // this is something special?
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+
+ // handle updates
+ if strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ // old key is in the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is also in the watched directory
+ if !dataSink.IsIncremental() {
+ oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
+ foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
+ if foundExisting {
+ return err
+ }
+
+ // not able to find old entry
+ if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
+ return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ }
+ }
+ // create the new entry
+ newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+
+ } else {
+ // new key is outside of the watched directory
+ if !dataSink.IsIncremental() {
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ }
+ } else {
+ // old key is outside of the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is in the watched directory
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ } else {
+ // new key is also outside of the watched directory
+ // skip
+ }
+ }
+
+ return nil
+ }
+ return processEventFn
+}
+
+func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
+ if !dataSink.IsIncremental() {
+ return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
+ }
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey := time.Unix(mTime, 0).Format("2006-01-02")
+ return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
+}
diff --git a/weed/command/master.go b/weed/command/master.go
index d569919cd..fb58cfefd 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -138,7 +138,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
- // Create your protocol servers.
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index a6d562d40..8da69d0ac 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -53,7 +53,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
filer := *option.filer
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@@ -63,16 +63,23 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ for i := 0; i < 10; i++ {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
if err != nil {
- return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.V(0).Infof("wait for %d seconds ...", i+1)
+ time.Sleep(time.Duration(i+1)*time.Second)
}
- cipher = resp.Cipher
- return nil
- })
+ }
if err != nil {
- glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
return true
}
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index b4b5855ff..db0b4148d 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -63,7 +63,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/command/s3.go b/weed/command/s3.go
index d8e3e306b..c8292a7d5 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -137,7 +137,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 993391a42..07d448042 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -356,6 +356,9 @@ directory = "/buckets"
[sink.local]
enabled = false
directory = "/data"
+# all replicated files are under modified time as yyyy-mm-dd directories
+# so each date directory contains all new and updated files.
+is_incremental = false
[sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories
@@ -373,6 +376,7 @@ directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
+is_incremental = false
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
@@ -384,6 +388,7 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
endpoint = ""
+is_incremental = false
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -391,6 +396,7 @@ enabled = false
google_application_credentials = "/path/to/x.json" # path to json credential file
bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
[sink.azure]
# experimental, let me know if it works
@@ -399,6 +405,7 @@ account_name = ""
account_key = ""
container = "mycontainer" # an existing container
directory = "/" # destination directory
+is_incremental = false
[sink.backblaze]
enabled = false
@@ -406,6 +413,7 @@ b2_account_id = ""
b2_master_application_key = ""
bucket = "mybucket" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
`
@@ -432,22 +440,28 @@ expires_after_seconds = 10 # seconds
# the host name is not checked, so the PERM files can be shared.
[grpc]
ca = ""
+# Set wildcard domain for enable TLS authentication by common names
+allowed_wildcard_domain = "" # .mycompany.com
[grpc.volume]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.master]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.filer]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.msg_broker]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
@@ -455,7 +469,6 @@ key = ""
cert = ""
key = ""
-
# volume server https options
# Note: work in progress!
# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
diff --git a/weed/command/server.go b/weed/command/server.go
index 611578953..a39802412 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -111,6 +111,7 @@ func init() {
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
+ serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
@@ -156,19 +157,21 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
- peers := strings.Join(peerList, ",")
- masterOptions.peers = &peers
+ if *isStartingMasterServer {
+ _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
+ peers := strings.Join(peerList, ",")
+ masterOptions.peers = &peers
+ }
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = &peers
+ filerOptions.masters = masterOptions.peers
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = &peers
+ serverOptions.v.masters = masterOptions.peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 659c93d96..f49ece9dc 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -62,6 +62,7 @@ type VolumeServerOptions struct {
preStopSeconds *int
metricsHttpPort *int
// pulseSeconds *int
+ enableTcp *bool
}
func init() {
@@ -88,6 +89,7 @@ func init() {
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
+ v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port")
}
var cmdVolume = &Command{
@@ -251,6 +253,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
+ // starting tcp server
+ if *v.enableTcp {
+ go v.startTcpService(volumeServer)
+ }
+
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
@@ -368,3 +375,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}()
return clusterHttpServer
}
+
+func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
+ listener, e := util.NewListener(listeningAddress, 0)
+ if e != nil {
+ glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
+ }
+ defer listener.Close()
+
+ for {
+ c, err := listener.Accept()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ go volumeServer.HandleTcpConnection(c)
+ }
+}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 2bd4a3c61..781ea1e36 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -78,7 +78,7 @@ func (wo *WebDavOption) startWebDav() bool {
}
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 50a669f40..bedf2f4d1 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -11,6 +11,10 @@ import (
type HardLinkId []byte
+const (
+ MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
+)
+
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
if p == "/" {
return nil
@@ -77,7 +81,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
}
for _, sub := range entries {
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 295a5039e..b29324b61 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -11,6 +11,10 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
+ f.maybeReloadFilerConfiguration(event)
+}
+
+func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {
if DirectoryEtcSeaweedFS != event.Directory {
if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {
return
@@ -26,7 +30,6 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
if entry.Name == FilerConfName {
f.reloadFilerConfiguration(entry)
}
-
}
func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 9437e9992..5c368a57e 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -69,6 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
peerSignature, err = ma.readFilerStoreSignature(peer)
}
+ // when filer store is not shared by multiple filers
if peerSignature != f.Signature {
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 075204b79..573ab65e8 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -15,7 +15,7 @@ import (
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- // fmt.Printf("start to stream content for chunks: %+v\n", chunks)
+ glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -26,6 +26,9 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
+ } else if len(urlStrings) == 0 {
+ glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
}
fileId2Url[chunkView.FileId] = urlStrings
}
@@ -39,6 +42,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
glog.Errorf("read chunk: %v", err)
return fmt.Errorf("read chunk: %v", err)
}
+
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
@@ -181,7 +185,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 10a0a2b44..33e1a0a3a 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -251,10 +251,10 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
-
- fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
+
+ fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
@@ -305,7 +305,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
+ dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir ReadDirAll %s", dirPath)
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
@@ -318,12 +319,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
return nil
}
- dirPath := util.FullPath(dir.FullPath())
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
- listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
processEachEntryFn(entry.ToProtoEntry(), false)
return true
})
@@ -389,12 +389,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
// clear entry inside the file
fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
+ dir.wfs.fsNodeCache.DeleteFsNode(filePath)
if fsNode != nil {
if file, ok := fsNode.(*File); ok {
file.clearEntry()
}
}
- dir.wfs.fsNodeCache.DeleteFsNode(filePath)
// remove current file handle if any
dir.wfs.handlesLock.Lock()
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index ba3280f03..606e52fcb 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -35,15 +35,20 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil, err
}
+ oldEntry := oldFile.getEntry()
+ if oldEntry == nil {
+ return nil, fuse.EIO
+ }
+
// update old file to hardlink mode
- if len(oldFile.entry.HardLinkId) == 0 {
- oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
- oldFile.entry.HardLinkCounter = 1
+ if len(oldEntry.HardLinkId) == 0 {
+ oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
+ oldEntry.HardLinkCounter = 1
}
- oldFile.entry.HardLinkCounter++
+ oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
- Entry: oldFile.entry,
+ Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
@@ -53,11 +58,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
- Attributes: oldFile.entry.Attributes,
- Chunks: oldFile.entry.Chunks,
- Extended: oldFile.entry.Extended,
- HardLinkId: oldFile.entry.HardLinkId,
- HardLinkCounter: oldFile.entry.HardLinkCounter,
+ Attributes: oldEntry.Attributes,
+ Chunks: oldEntry.Chunks,
+ Extended: oldEntry.Extended,
+ HardLinkId: oldEntry.HardLinkId,
+ HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
@@ -83,6 +88,10 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil
})
+ if err != nil {
+ return nil, fuse.EIO
+ }
+
// create new file node
newNode := dir.newFile(req.NewName, request.Entry)
newFile := newNode.(*File)
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index f05a3a56a..8888cff96 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
- glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
+ glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false
}
- fileSize := int64(pages.f.entry.Attributes.FileSize)
+ entry := pages.f.getEntry()
+ if entry == nil {
+ return false
+ }
+
+ fileSize := int64(entry.Attributes.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index a210c5152..5931dd2ff 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -5,6 +5,7 @@ import (
"io"
"os"
"sort"
+ "sync"
"time"
"github.com/seaweedfs/fuse"
@@ -33,6 +34,7 @@ type File struct {
dir *Dir
wfs *WFS
entry *filer_pb.Entry
+ entryLock sync.RWMutex
entryViewCache []filer.VisibleInterval
isOpen int
reader io.ReaderAt
@@ -47,7 +49,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
- entry := file.entry
+ entry := file.getEntry()
if file.isOpen <= 0 || entry == nil {
if entry, err = file.maybeLoadEntry(ctx); err != nil {
return err
@@ -106,7 +108,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
- _, err := file.maybeLoadEntry(ctx)
+ entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
@@ -123,12 +125,12 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if req.Valid.Size() {
- glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
- if req.Size < filer.FileSize(file.entry) {
+ glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
+ if req.Size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
- for _, chunk := range file.entry.Chunks {
+ for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
// this chunk is truncated
@@ -143,36 +145,36 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
}
- file.entry.Chunks = chunks
+ entry.Chunks = chunks
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil
}
- file.entry.Attributes.FileSize = req.Size
+ entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
}
if req.Valid.Mode() {
- file.entry.Attributes.FileMode = uint32(req.Mode)
+ entry.Attributes.FileMode = uint32(req.Mode)
file.dirtyMetadata = true
}
if req.Valid.Uid() {
- file.entry.Attributes.Uid = req.Uid
+ entry.Attributes.Uid = req.Uid
file.dirtyMetadata = true
}
if req.Valid.Gid() {
- file.entry.Attributes.Gid = req.Gid
+ entry.Attributes.Gid = req.Gid
file.dirtyMetadata = true
}
if req.Valid.Crtime() {
- file.entry.Attributes.Crtime = req.Crtime.Unix()
+ entry.Attributes.Crtime = req.Crtime.Unix()
file.dirtyMetadata = true
}
if req.Valid.Mtime() {
- file.entry.Attributes.Mtime = req.Mtime.Unix()
+ entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
@@ -188,7 +190,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return nil
}
- return file.saveEntry(file.entry)
+ return file.saveEntry(entry)
}
@@ -258,7 +260,7 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
- entry = file.entry
+ entry = file.getEntry()
if file.isOpen > 0 {
return entry, nil
}
@@ -299,8 +301,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
}
+ entry := file.getEntry()
+ if entry == nil {
+ return
+ }
+
// pick out-of-order chunks from existing chunks
- for _, chunk := range file.entry.Chunks {
+ for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
@@ -318,18 +325,22 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.reader = nil
- glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
- file.entry.Chunks = append(file.entry.Chunks, newChunks...)
+ entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
+ file.entryLock.Lock()
+ defer file.entryLock.Unlock()
file.entry = entry
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
func (file *File) clearEntry() {
+ file.entryLock.Lock()
+ defer file.entryLock.Unlock()
file.entry = nil
file.entryViewCache = nil
file.reader = nil
@@ -359,3 +370,9 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
return nil
})
}
+
+func (file *File) getEntry() *filer_pb.Entry {
+ file.entryLock.RLock()
+ defer file.entryLock.RUnlock()
+ return file.entry
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index fb073c9cd..25eaf7033 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -40,8 +40,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Uid: uid,
Gid: gid,
}
- if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
+ entry := fh.f.getEntry()
+ if entry != nil {
+ entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
@@ -104,22 +105,28 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- fileSize := int64(filer.FileSize(fh.f.entry))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return 0, io.EOF
+ }
+
+ fileSize := int64(filer.FileSize(entry))
+ fileFullPath := fh.f.fullpath()
if fileSize == 0 {
- glog.V(1).Infof("empty fh %v", fh.f.fullpath())
+ glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
}
- if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) {
- totalRead := copy(buff, fh.f.entry.Content[offset:])
- glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead)
+ if offset+int64(len(buff)) <= int64(len(entry.Content)) {
+ totalRead := copy(buff, entry.Content[offset:])
+ glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@@ -136,10 +143,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
- glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
- glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -158,8 +165,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
}
- fh.f.entry.Content = nil
- fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return fuse.EIO
+ }
+
+ entry.Content = nil
+ entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@@ -242,35 +254,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- if fh.f.entry.Attributes != nil {
- fh.f.entry.Attributes.Mime = fh.contentType
- if fh.f.entry.Attributes.Uid == 0 {
- fh.f.entry.Attributes.Uid = header.Uid
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return nil
+ }
+
+ if entry.Attributes != nil {
+ entry.Attributes.Mime = fh.contentType
+ if entry.Attributes.Uid == 0 {
+ entry.Attributes.Uid = header.Uid
}
- if fh.f.entry.Attributes.Gid == 0 {
- fh.f.entry.Attributes.Gid = header.Gid
+ if entry.Attributes.Gid == 0 {
+ entry.Attributes.Gid = header.Gid
}
- if fh.f.entry.Attributes.Crtime == 0 {
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
+ if entry.Attributes.Crtime == 0 {
+ entry.Attributes.Crtime = time.Now().Unix()
}
- fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
- fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
- fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
+ entry.Attributes.Collection = fh.dirtyPages.collection
+ entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
- Entry: fh.f.entry,
+ Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
- for i, chunk := range fh.f.entry.Chunks {
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
+ for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
@@ -278,7 +295,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
- fh.f.entry.Chunks = append(chunks, manifestChunks...)
+ entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index fdec8253c..6b1012090 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
- if f.entry != nil {
- f.entry.Name = f.Name
+ entry := f.getEntry()
+ if entry != nil {
+ entry.Name = f.Name
}
}
parent.disconnectChild(target)
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 70428bb07..9957a04cd 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -31,6 +31,7 @@ type UploadResult struct {
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
ContentMd5 string `json:"contentMd5,omitempty"`
+ RetryCount int `json:"-"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
@@ -96,6 +97,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
for i := 0; i < 3; i++ {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if err == nil {
+ uploadResult.RetryCount = i
return
} else {
glog.Warningf("uploading to %s: %v", uploadUrl, err)
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 079fbd671..e4d8bee34 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
+ "math"
"os"
"strings"
"time"
@@ -101,12 +102,16 @@ func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string,
}
func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
-
+ // Redundancy limit to make it correctly judge whether it is the last file.
+ redLimit := limit
+ if limit != math.MaxInt32 && limit != 0{
+ redLimit = limit + 1
+ }
request := &ListEntriesRequest{
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: startFrom,
- Limit: limit,
+ Limit: redLimit,
InclusiveStartFrom: inclusive,
}
@@ -119,6 +124,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
var prevEntry *Entry
+ count := 0
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@@ -139,6 +145,10 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
}
prevEntry = resp.Entry
+ count++
+ if count > int(limit) && limit != 0 {
+ prevEntry = nil
+ }
}
return nil
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 910114313..9efcd9bdc 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -3,6 +3,7 @@ package pb
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"net/http"
"strconv"
"strings"
@@ -108,51 +109,55 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts
}
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
- colonIndex := strings.LastIndex(server, ":")
- if colonIndex < 0 {
- return "", fmt.Errorf("server should have hostname:port format: %v", server)
- }
+ return ParseServerAddress(server, 10000)
+}
- port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
+func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
+
+ host, port, parseErr := hostAndPort(server)
if parseErr != nil {
return "", fmt.Errorf("server port parse error: %v", parseErr)
}
- grpcPort := int(port) + 10000
+ newPort := int(port) + deltaPort
- return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
+ return fmt.Sprintf("%s:%d", host, newPort), nil
}
-func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
- hostnameAndPort := strings.Split(server, ":")
- if len(hostnameAndPort) != 2 {
- return fmt.Sprintf("unexpected server address: %s", server)
+func hostAndPort(address string) (host string, port uint64, err error) {
+ colonIndex := strings.LastIndex(address, ":")
+ if colonIndex < 0 {
+ return "", 0, fmt.Errorf("server should have hostname:port format: %v", address)
+ }
+ port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64)
+ if err != nil {
+ return "", 0, fmt.Errorf("server port parse error: %v", err)
}
- port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ return address[:colonIndex], port, err
+}
+
+func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
+
+ host, port, parseErr := hostAndPort(server)
if parseErr != nil {
- return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
+ glog.Fatalf("server address %s parse error: %v", server, parseErr)
}
grpcPort := int(port) + 10000
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
+ return fmt.Sprintf("%s:%d", host, grpcPort)
}
func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
- hostnameAndPort := strings.Split(grpcAddress, ":")
- if len(hostnameAndPort) != 2 {
- return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress)
- }
-
- grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ host, grpcPort, parseErr := hostAndPort(grpcAddress)
if parseErr != nil {
- return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
+ glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr)
}
port := int(grpcPort) - 10000
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], port)
+ return fmt.Sprintf("%s:%d", host, port)
}
func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
@@ -197,19 +202,3 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
-
-func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
-}
diff --git a/weed/pb/volume_info.go b/weed/pb/volume_info.go
index c4f733f5c..cae9e018f 100644
--- a/weed/pb/volume_info.go
+++ b/weed/pb/volume_info.go
@@ -15,40 +15,49 @@ import (
)
// MaybeLoadVolumeInfo load the file data as *volume_server_pb.VolumeInfo, the returned volumeInfo will not be nil
-func MaybeLoadVolumeInfo(fileName string) (*volume_server_pb.VolumeInfo, bool, error) {
+func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeInfo, hasRemoteFile bool, hasVolumeInfoFile bool, err error) {
- volumeInfo := &volume_server_pb.VolumeInfo{}
+ volumeInfo = &volume_server_pb.VolumeInfo{}
glog.V(1).Infof("maybeLoadVolumeInfo checks %s", fileName)
if exists, canRead, _, _, _ := util.CheckFile(fileName); !exists || !canRead {
if !exists {
- return volumeInfo, false, nil
+ return
}
+ hasVolumeInfoFile = true
if !canRead {
glog.Warningf("can not read %s", fileName)
- return volumeInfo, false, fmt.Errorf("can not read %s", fileName)
+ err = fmt.Errorf("can not read %s", fileName)
+ return
}
- return volumeInfo, false, nil
+ return
}
+ hasVolumeInfoFile = true
+
glog.V(1).Infof("maybeLoadVolumeInfo reads %s", fileName)
tierData, readErr := ioutil.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
- return volumeInfo, false, fmt.Errorf("fail to read %s : %v", fileName, readErr)
+ err = fmt.Errorf("fail to read %s : %v", fileName, readErr)
+ return
+
}
glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName)
- if err := jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
+ if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
glog.Warningf("unmarshal error: %v", err)
- return volumeInfo, false, fmt.Errorf("unmarshal error: %v", err)
+ err = fmt.Errorf("unmarshal error: %v", err)
+ return
}
if len(volumeInfo.GetFiles()) == 0 {
- return volumeInfo, false, nil
+ return
}
- return volumeInfo, true, nil
+ hasRemoteFile = true
+
+ return
}
func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) error {
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_util.go
index 3514c6977..f642bb801 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 7688029e6..d7e609c68 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil
}
var dateKey string
- if r.sink.GetName() == "local_incremental" {
+ if r.sink.IsIncremental() {
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index df70be64b..d13a1049b 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -18,10 +18,11 @@ import (
)
type AzureSink struct {
- containerURL azblob.ContainerURL
- container string
- dir string
- filerSource *source.FilerSource
+ containerURL azblob.ContainerURL
+ container string
+ dir string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
+func (g *AzureSink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"),
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 24f0ecbbc..90a0bb2e8 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -14,10 +14,11 @@ import (
)
type B2Sink struct {
- client *b2.Client
- bucket string
- dir string
- filerSource *source.FilerSource
+ client *b2.Client
+ bucket string
+ dir string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
+func (g *B2Sink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"b2_account_id"),
configuration.GetString(prefix+"b2_master_application_key"),
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 509f75116..d7c5fccc3 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -30,6 +30,7 @@ type FilerSink struct {
grpcDialOption grpc.DialOption
address string
writeChunkByFiler bool
+ isIncremental bool
}
func init() {
@@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
+func (fs *FilerSink) IsIncremental() bool {
+ return fs.isIncremental
+}
+
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
+ fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index badabc32c..5cf5b7317 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -18,10 +18,11 @@ import (
)
type GcsSink struct {
- client *storage.Client
- bucket string
- dir string
- filerSource *source.FilerSource
+ client *storage.Client
+ bucket string
+ dir string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
+func (g *GcsSink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"),
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 21c625c3f..2b9b3e69a 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string {
return localsink.Dir
}
+func (localsink *LocalSink) IsIncremental() bool {
+ return true
+}
+
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if localsink.isMultiPartEntry(key) {
return nil
@@ -74,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
if _, err := os.Stat(dir); os.IsNotExist(err) {
glog.V(4).Infof("Create Direcotry key: %s", dir)
- if err = os.MkdirAll(dir, 0); err != nil {
+ if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
}
writeFunc := func(data []byte) error {
- writeErr := ioutil.WriteFile(key, data, 0)
+ writeErr := ioutil.WriteFile(key, data, 0755)
return writeErr
}
diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go
index cfc6e0a4d..4ffd09462 100644
--- a/weed/replication/sink/replication_sink.go
+++ b/weed/replication/sink/replication_sink.go
@@ -14,6 +14,7 @@ type ReplicationSink interface {
UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error)
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
+ IsIncremental() bool
}
var (
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 58432ee6b..9a36573e3 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -21,12 +21,13 @@ import (
)
type S3Sink struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- endpoint string
- filerSource *source.FilerSource
+ conn s3iface.S3API
+ region string
+ bucket string
+ dir string
+ endpoint string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
+func (s3sink *S3Sink) IsIncremental() bool {
+ return s3sink.isIncremental
+}
+
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
+ glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
+ s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@@ -67,8 +74,9 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc
s3sink.endpoint = endpoint
config := &aws.Config{
- Region: aws.String(s3sink.region),
- Endpoint: aws.String(s3sink.endpoint),
+ Region: aws.String(s3sink.region),
+ Endpoint: aws.String(s3sink.endpoint),
+ S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@@ -104,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
uploadId, err := s3sink.createMultipartUpload(key, entry)
if err != nil {
- return err
+ return fmt.Errorf("createMultipartUpload: %v", err)
}
totalSize := filer.FileSize(entry)
@@ -120,6 +128,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr
+ glog.Errorf("uploadPart: %v", uploadErr)
} else {
parts[index] = part
}
@@ -129,7 +138,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
if err != nil {
s3sink.abortMultipartUpload(key, uploadId)
- return err
+ return fmt.Errorf("uploadPart: %v", err)
}
return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts)
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index b172ea2c3..3dde52616 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -24,7 +24,7 @@ func (s3sink *S3Sink) deleteObject(key string) error {
result, err := s3sink.conn.DeleteObject(input)
if err == nil {
- glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
+ glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
}
@@ -43,7 +43,7 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (
result, err := s3sink.conn.CreateMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
+ glog.V(2).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
return "", err
@@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
+ glog.V(2).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
+ return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
}
- return err
+ return nil
}
// To upload a part
@@ -122,7 +123,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
result, err := s3sink.conn.UploadPart(input)
if err == nil {
- glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
+ glog.V(2).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
} else {
glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
}
@@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
}
buf := make([]byte, chunk.Size)
for _, fileUrl := range fileUrls {
- _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else {
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 3982360b0..e2e3575dc 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
- for _, loc := range locations.Locations {
- fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
+ if !fs.proxyByFiler {
+ for _, loc := range locations.Locations {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
+ }
+ } else {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
}
return
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 3626ece98..12a74126a 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -31,6 +31,10 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
return nil
}, startFrom, inclusive, limit)
+ if len(entries) == 0 {
+ isLast = true
+ }
+
return
}
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 338f82668..48e8cb047 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
- if identity != nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
+ if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 19d85c495..610daef9f 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -5,9 +5,11 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
+ "net/url"
"sort"
"strings"
@@ -69,7 +71,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
@@ -84,6 +86,14 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w)
}
+func urlPathEscape(object string) string {
+ var escapedParts []string
+ for _, part := range strings.Split(object, "/") {
+ escapedParts = append(escapedParts, url.PathEscape(part))
+ }
+ return strings.Join(escapedParts, "/")
+}
+
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r)
@@ -94,7 +104,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@@ -105,7 +115,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@@ -116,7 +126,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) {
for k, v := range proxyResponse.Header {
@@ -196,6 +206,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
if err == nil {
directoriesWithDeletion[parentDirectoryPath]++
deletedObjects = append(deletedObjects, object)
+ } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
+ deletedObjects = append(deletedObjects, object)
} else {
delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{
diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go
index 044e732db..035302ae6 100644
--- a/weed/s3api/s3api_object_handlers_postpolicy.go
+++ b/weed/s3api/s3api_object_handlers_postpolicy.go
@@ -110,7 +110,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody)
diff --git a/weed/security/tls.go b/weed/security/tls.go
index b4bf84e2d..7d3ffcdca 100644
--- a/weed/security/tls.go
+++ b/weed/security/tls.go
@@ -1,10 +1,16 @@
package security
import (
+ "context"
"crypto/tls"
"crypto/x509"
"github.com/chrislusf/seaweedfs/weed/util"
+ grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
"io/ioutil"
+ "strings"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -12,21 +18,29 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
)
-func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption {
+type Authenticator struct {
+ AllowedWildcardDomain string
+ AllowedCommonNames map[string]bool
+}
+
+func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption, grpc.ServerOption) {
if config == nil {
- return nil
+ return nil, nil
}
// load cert/key, ca cert
cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
if err != nil {
- glog.V(1).Infof("load cert/key error: %v", err)
- return nil
+ glog.V(1).Infof("load cert: %s / key: %s error: %v",
+ config.GetString(component+".cert"),
+ config.GetString(component+".key"),
+ err)
+ return nil, nil
}
caCert, err := ioutil.ReadFile(config.GetString("grpc.ca"))
if err != nil {
- glog.V(1).Infof("read ca cert file error: %v", err)
- return nil
+ glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err)
+ return nil, nil
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
@@ -36,7 +50,20 @@ func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption
ClientAuth: tls.RequireAndVerifyClientCert,
})
- return grpc.Creds(ta)
+ allowedCommonNames := config.GetString(component + ".allowed_commonNames")
+ allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain")
+ if allowedCommonNames != "" || allowedWildcardDomain != "" {
+ allowedCommonNamesMap := make(map[string]bool)
+ for _, s := range strings.Split(allowedCommonNames, ",") {
+ allowedCommonNamesMap[s] = true
+ }
+ auther := Authenticator{
+ AllowedCommonNames: allowedCommonNamesMap,
+ AllowedWildcardDomain: allowedWildcardDomain,
+ }
+ return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate))
+ }
+ return grpc.Creds(ta), nil
}
func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
@@ -70,3 +97,28 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
})
return grpc.WithTransportCredentials(ta)
}
+
+func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) {
+ p, ok := peer.FromContext(ctx)
+ if !ok {
+ return ctx, status.Error(codes.Unauthenticated, "no peer found")
+ }
+
+ tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
+ if !ok {
+ return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
+ }
+ if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
+ return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate")
+ }
+
+ commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
+ if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) {
+ return ctx, nil
+ }
+ if _, ok := a.AllowedCommonNames[commonName]; ok {
+ return ctx, nil
+ }
+
+ return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName)
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 22474a5e2..9e0770afa 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -153,7 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
- _, err := pb.ParseFilerGrpcAddress(master)
+ _, err := pb.ParseServerToGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index f77462adb..892a732f7 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
- glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
- stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
- w.WriteHeader(http.StatusNoContent)
- return
- }
-
w.Header().Set("Accept-Ranges", "bytes")
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
// mime type
mimeType := entry.Attr.Mime
@@ -164,6 +156,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
+ if err != nil {
+ glog.Errorf("failed to write entry content: %v", err)
+ }
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d3ce7e605..318399281 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -38,10 +38,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
chunkSize := 1024 * 1024 * maxMB
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
}()
var reply *FilerPostResult
@@ -302,13 +302,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
- stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
}()
uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ if uploadResult != nil && uploadResult.RetryCount > 0 {
+ stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
+ }
return uploadResult, err, data
}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 29aff5c0b..156afd4a1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for "+option.String())
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
-
- totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
-
resp := &master_pb.StatisticsResponse{
- TotalSize: uint64(totalSize),
+ TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..a009611da
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,137 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
+ return
+ }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index edacf22c6..634cb11e2 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -274,7 +274,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
- fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
+ fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool)
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
new file mode 100644
index 000000000..1fe13d981
--- /dev/null
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -0,0 +1,92 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandS3CleanUploads{})
+}
+
+type commandS3CleanUploads struct {
+}
+
+func (c *commandS3CleanUploads) Name() string {
+ return "s3.clean.uploads"
+}
+
+func (c *commandS3CleanUploads) Help() string {
+ return `clean up stale multipart uploads
+
+ Example:
+ s3.clean.uploads -replication 001
+
+`
+}
+
+func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ var buckets []string
+ err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ buckets = append(buckets, entry.Name)
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
+ }
+
+ for _, bucket:= range buckets {
+ c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
+ }
+
+ return err
+
+}
+
+func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
+ uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads"
+ var staleUploads []string
+ now := time.Now()
+ err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ ctime := time.Unix(entry.Attributes.Crtime, 0)
+ if ctime.Add(timeAgo).Before(now) {
+ staleUploads = append(staleUploads, entry.Name)
+ }
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
+ }
+
+ for _, staleUpload:= range staleUploads {
+ deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload)
+ fmt.Fprintf(writer, "purge %s\n", deleteUrl)
+
+ err = util.Delete(deleteUrl, "")
+ if err != nil {
+ return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
+ }
+ }
+
+ return nil
+
+} \ No newline at end of file
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index a15efd6b3..f7fa94031 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
keepDataNodesSorted(allLocations, toDiskType)
fn := capacityByFreeVolumeCount(toDiskType)
for _, dst := range allLocations {
- if fn(dst.dataNode) > 0 {
+ if fn(dst.dataNode) > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
if isOneOf(dst.dataNode.Id, locations) {
continue
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index b8b883be6..2dc61d02e 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -58,6 +58,9 @@ func LoadConfiguration(config *util.ViperProxy) {
if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
continue
}
+ if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found {
+ continue
+ }
backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
if buildErr != nil {
@@ -81,6 +84,9 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
glog.Warningf("storage type %s not found", storageBackend.Type)
continue
}
+ if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found {
+ continue
+ }
backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
if buildErr != nil {
glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 498963c31..3b42429cf 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
return
}
-func (df *DiskFile) Append(p []byte) (n int, err error) {
+func (df *DiskFile) Write(p []byte) (n int, err error) {
return df.WriteAt(p, df.fileSize)
}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
index e2fdf1eb6..b8378c379 100644
--- a/weed/storage/backend/s3_backend/s3_sessions.go
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -34,8 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string)
}
config := &aws.Config{
- Region: aws.String(region),
- Endpoint: aws.String(endpoint),
+ Region: aws.String(region),
+ Endpoint: aws.String(endpoint),
+ S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 85d6a5fc8..171db92a4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -63,7 +63,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
+ if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 6fd910bb7..22456faa2 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -2,6 +2,7 @@ package needle
import (
"fmt"
+ "io"
"github.com/klauspost/crc32"
@@ -29,3 +30,25 @@ func (n *Needle) Etag() string {
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
}
+
+func NewCRCwriter(w io.Writer) *CRCwriter {
+
+ return &CRCwriter{
+ crc: CRC(0),
+ w: w,
+ }
+
+}
+
+type CRCwriter struct {
+ crc CRC
+ w io.Writer
+}
+
+func (c *CRCwriter) Write(p []byte) (n int, err error) {
+ n, err = c.w.Write(p) // with each write ...
+ c.crc = c.crc.Update(p)
+ return
+}
+
+func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 0f72bc0bb..e51df955e 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
}
if err != nil {
fileSize, _, _ := r.GetStat()
- println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
+ println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
}
return dataSlice, err
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 9716e9729..31c86d124 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -152,8 +152,10 @@ func (m *LevelDbNeedleMap) Close() {
glog.Warningf("close index file %s failed: %v", indexFileName, err)
}
- if err := m.db.Close(); err != nil {
- glog.Warningf("close levelDB failed: %v", err)
+ if m.db != nil {
+ if err := m.db.Close(); err != nil {
+ glog.Warningf("close levelDB failed: %v", err)
+ }
}
}
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index 3449ff9dc..662b90531 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
}
func (m *SortedFileNeedleMap) Close() {
- m.indexFile.Close()
- m.dbFile.Close()
+ if m.indexFile != nil {
+ m.indexFile.Close()
+ }
+ if m.dbFile != nil {
+ m.dbFile.Close()
+ }
}
func (m *SortedFileNeedleMap) Destroy() error {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 47829666a..fb33a708c 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -220,20 +220,30 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey
}
+ deleteVolume := false
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
} else {
glog.V(0).Infof("volume %d is expired", v.Id)
}
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
}
}
- collectionVolumeSize[v.Collection] += volumeMessage.Size
+
+ if _, exist := collectionVolumeSize[v.Collection]; !exist {
+ collectionVolumeSize[v.Collection] = 0
+ }
+ if !deleteVolume {
+ collectionVolumeSize[v.Collection] += volumeMessage.Size
+ }
+
if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
"IsReadOnly": 0,
@@ -242,7 +252,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
"isDiskSpaceLow": 0,
}
}
- if v.IsReadOnly() {
+ if !deleteVolume && v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
if v.noWriteOrDelete {
collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
@@ -267,7 +277,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
glog.V(0).Infof("volume %d is deleted", vid)
}
} else {
- glog.V(0).Infof("delete volume %d: %v", vid, err)
+ glog.Warningf("delete volume %d: %v", vid, err)
}
}
location.volumesLock.Unlock()
@@ -446,7 +456,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
// load, modify, save
baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
vifFile := filepath.Join(location.Directory, baseFileName+".vif")
- volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
+ volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile)
if err != nil {
return fmt.Errorf("volume %d fail to load vif", i)
}
diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go
index 65ec53819..a263e6669 100644
--- a/weed/storage/super_block/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
}
func (rp *ReplicaPlacement) Byte() byte {
+ if rp == nil {
+ return 0
+ }
ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
return byte(ret)
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index bff1055bb..0cf603ad8 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -39,12 +39,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}()
- hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
+ hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
- glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
+ glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
@@ -83,6 +83,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if alreadyHasSuperBlock {
err = v.readSuperBlock()
+ glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
+ if v.HasRemoteFile() {
+ // maybe temporary network problem
+ glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
+ err = nil
+ }
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 07376bc88..1853e458a 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -104,47 +104,8 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
return
}
- if v.isFileUnchanged(n) {
- size = Size(n.DataSize)
- isUnchanged = true
- return
- }
-
- // check whether existing needle cookie matches
- nv, ok := v.nm.Get(n.Id)
- if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
- if existingNeedleReadErr != nil {
- err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
- return
- }
- if existingNeedle.Cookie != n.Cookie {
- glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
- err = fmt.Errorf("mismatching cookie %x", n.Cookie)
- return
- }
- }
-
- // append to dat file
- n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, size, _, err = n.Append(v.DataBackend, v.Version())
- v.checkReadWriteError(err)
- if err != nil {
- return
- }
-
- v.lastAppendAtNs = n.AppendAtNs
- // add to needle map
- if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
- if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
- glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
- }
- }
- if v.lastModifiedTsSeconds < n.LastModified {
- v.lastModifiedTsSeconds = n.LastModified
- }
- return
+ return v.doWriteRequest(n)
}
func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
@@ -223,24 +184,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
return 0, err
}
- nv, ok := v.nm.Get(n.Id)
- // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok && nv.Size.IsValid() {
- size := nv.Size
- n.Data = nil
- n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.DataBackend, v.Version())
- v.checkReadWriteError(err)
- if err != nil {
- return size, err
- }
- v.lastAppendAtNs = n.AppendAtNs
- if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
- return size, err
- }
- return size, err
- }
- return 0, nil
+ return v.doDeleteRequest(n)
}
func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
new file mode 100644
index 000000000..955875aa2
--- /dev/null
+++ b/weed/storage/volume_stream_write.go
@@ -0,0 +1,104 @@
+package storage
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ df, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("unexpected volume backend")
+ }
+ offset, _, _ := v.DataBackend.GetStat()
+
+ header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
+ CookieToBytes(header[0:CookieSize], n.Cookie)
+ NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
+ n.Size = 4 + Size(dataSize) + 1
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+
+ n.DataSize = dataSize
+
+ // needle header
+ df.Write(header[0:NeedleHeaderSize])
+
+ // data size and data
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ df.Write(header[0:4])
+ // write and calculate CRC
+ crcWriter := needle.NewCRCwriter(df)
+ io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
+
+ // flags
+ util.Uint8toBytes(header[0:1], n.Flags)
+ df.Write(header[0:1])
+
+ // data checksum
+ util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
+ // write timestamp, padding
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
+ padding := needle.PaddingLength(n.Size, needle.Version3)
+ df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
+
+ // add to needle map
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ return
+}
+
+func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+
+ sr := &StreamReader{
+ readerAt: v.DataBackend,
+ offset: nv.Offset.ToActualOffset(),
+ }
+ bufReader := bufio.NewReader(sr)
+ bufReader.Discard(NeedleHeaderSize)
+ sizeBuf := make([]byte, 4)
+ bufReader.Read(sizeBuf)
+ if _, err = writer.Write(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
+
+ return
+}
+
+type StreamReader struct {
+ offset int64
+ readerAt io.ReaderAt
+}
+
+func (sr *StreamReader) Read(p []byte) (n int, err error) {
+ n, err = sr.readerAt.ReadAt(p, sr.offset)
+ if err != nil {
+ return
+ }
+ sr.offset += int64(n)
+ return
+}
diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go
index 77efd8a14..23160906b 100644
--- a/weed/storage/volume_tier.go
+++ b/weed/storage/volume_tier.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
@@ -14,13 +15,23 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
- v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
+ var err error
+ v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
+
+ if v.volumeInfo.Version == 0 {
+ v.volumeInfo.Version = uint32(needle.CurrentVersion)
+ }
if v.hasRemoteFile {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
}
+ if err != nil {
+ glog.Warningf("load volume %d.vif file: %v", v.Id, err)
+ return
+ }
+
return
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 0ee1e61c6..be84f8a13 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
- dstDatBackend.Append(needleBytes)
+ dstDatBackend.Write(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index df1b6d658..330b16b24 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -25,7 +25,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
existingEcShards := dn.GetEcShards()
- // found out the newShards and deletedShards
+ // find out the newShards and deletedShards
var newShardCount, deletedShardCount int
for _, ecShards := range existingEcShards {
@@ -56,20 +56,19 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
+
for _, ecShards := range actualShards {
+ if dn.hasEcShards(ecShards.VolumeId) {
+ continue
+ }
+
+ newShards = append(newShards, ecShards)
disk := dn.getOrCreateDisk(ecShards.DiskType)
deltaDiskUsages := newDiskUsages()
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
-
- if !dn.hasEcShards(ecShards.VolumeId) {
- newShards = append(newShards, ecShards)
- newShardCount += ecShards.ShardIdCount()
- }
-
- deltaDiskUsage.ecShardCount = int64(newShardCount)
+ deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
-
}
if len(newShards) > 0 || len(deletedShards) > 0 {
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 5784c894b..c7e171248 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -432,7 +432,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
if vl.readonlyVolumes.IsTrue(vid) {
ret.TotalSize += size
} else {
- ret.TotalSize += vl.volumeSizeLimit
+ ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length())
}
}
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 64c13ca52..548c4cd25 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -82,7 +82,7 @@ func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64
if dnl.LastSeen < freshThreshHold {
vinfo, err := dnl.GetVolumesById(vid)
if err == nil {
- return vinfo.Size - vinfo.DeletedByteCount, vinfo.FileCount - vinfo.DeleteCount
+ return (vinfo.Size - vinfo.DeletedByteCount) * uint64(len(dnll.list)), vinfo.FileCount - vinfo.DeleteCount
}
}
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 908f782ae..1cd2f160a 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 27)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 31)
COMMIT = ""
)
diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go
index 82575af98..6c31a40da 100644
--- a/weed/util/fasthttp_util.go
+++ b/weed/util/fasthttp_util.go
@@ -18,6 +18,7 @@ var (
WriteTimeout: time.Second,
MaxIdleConnDuration: time.Minute,
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
+ DialDualStack: true,
}
// Put everything in pools to prevent garbage.
diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go
index 4a5884e9b..eb5700e8c 100644
--- a/weed/util/fla9/fla9.go
+++ b/weed/util/fla9/fla9.go
@@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) {
// The return value will be ErrHelp if -help or -h were set but not defined.
func (f *FlagSet) Parse(arguments []string) error {
if _, ok := f.formal[DefaultConfigFlagName]; !ok {
- f.String(DefaultConfigFlagName, "", "config file")
+ f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format")
}
f.parsed = true
@@ -995,6 +995,7 @@ func NewFlagSet(name string, errorHandling ErrorHandling) *FlagSet {
f := &FlagSet{
name: name,
errorHandling: errorHandling,
+ envPrefix: EnvPrefix,
}
return f
}
@@ -1078,7 +1079,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand
// DefaultConfigFlagName defines the flag name of the optional config file
// path. Used to lookup and parse the config file when a default is set and
// available on disk.
-var DefaultConfigFlagName = "conf"
+var DefaultConfigFlagName = "options"
// ParseFile parses flags from the file in path.
// Same format as commandline arguments, newlines and lines beginning with a
diff --git a/weed/wdclient/net2/base_connection_pool.go b/weed/wdclient/net2/base_connection_pool.go
new file mode 100644
index 000000000..5cc037d0f
--- /dev/null
+++ b/weed/wdclient/net2/base_connection_pool.go
@@ -0,0 +1,159 @@
+package net2
+
+import (
+ "net"
+ "strings"
+ "time"
+
+ rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
+)
+
+const defaultDialTimeout = 1 * time.Second
+
+func defaultDialFunc(network string, address string) (net.Conn, error) {
+ return net.DialTimeout(network, address, defaultDialTimeout)
+}
+
+func parseResourceLocation(resourceLocation string) (
+ network string,
+ address string) {
+
+ idx := strings.Index(resourceLocation, " ")
+ if idx >= 0 {
+ return resourceLocation[:idx], resourceLocation[idx+1:]
+ }
+
+ return "", resourceLocation
+}
+
+// A thin wrapper around the underlying resource pool.
+type connectionPoolImpl struct {
+ options ConnectionOptions
+
+ pool rp.ResourcePool
+}
+
+// This returns a connection pool where all connections are connected
+// to the same (network, address)
+func newBaseConnectionPool(
+ options ConnectionOptions,
+ createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
+
+ dial := options.Dial
+ if dial == nil {
+ dial = defaultDialFunc
+ }
+
+ openFunc := func(loc string) (interface{}, error) {
+ network, address := parseResourceLocation(loc)
+ return dial(network, address)
+ }
+
+ closeFunc := func(handle interface{}) error {
+ return handle.(net.Conn).Close()
+ }
+
+ poolOptions := rp.Options{
+ MaxActiveHandles: options.MaxActiveConnections,
+ MaxIdleHandles: options.MaxIdleConnections,
+ MaxIdleTime: options.MaxIdleTime,
+ OpenMaxConcurrency: options.DialMaxConcurrency,
+ Open: openFunc,
+ Close: closeFunc,
+ NowFunc: options.NowFunc,
+ }
+
+ return &connectionPoolImpl{
+ options: options,
+ pool: createPool(poolOptions),
+ }
+}
+
+// This returns a connection pool where all connections are connected
+// to the same (network, address)
+func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
+ return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
+}
+
+// This returns a connection pool that manages multiple (network, address)
+// entries. The connections to each (network, address) entry acts
+// independently. For example ("tcp", "localhost:11211") could act as memcache
+// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
+func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
+ return newBaseConnectionPool(
+ options,
+ func(poolOptions rp.Options) rp.ResourcePool {
+ return rp.NewMultiResourcePool(poolOptions, nil)
+ })
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) NumActive() int32 {
+ return p.pool.NumActive()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
+ return p.pool.ActiveHighWaterMark()
+}
+
+// This returns the number of alive idle connections. This method is not part
+// of ConnectionPool's API. It is used only for testing.
+func (p *connectionPoolImpl) NumIdle() int {
+ return p.pool.NumIdle()
+}
+
+// BaseConnectionPool can only register a single (network, address) entry.
+// Register should be call before any Get calls.
+func (p *connectionPoolImpl) Register(network string, address string) error {
+ return p.pool.Register(network + " " + address)
+}
+
+// BaseConnectionPool has nothing to do on Unregister.
+func (p *connectionPoolImpl) Unregister(network string, address string) error {
+ return nil
+}
+
+func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
+ result := make([]NetworkAddress, 0, 1)
+ for _, location := range p.pool.ListRegistered() {
+ network, address := parseResourceLocation(location)
+
+ result = append(
+ result,
+ NetworkAddress{
+ Network: network,
+ Address: address,
+ })
+ }
+ return result
+}
+
+// This gets an active connection from the connection pool. Note that network
+// and address arguments are ignored (The connections with point to the
+// network/address provided by the first Register call).
+func (p *connectionPoolImpl) Get(
+ network string,
+ address string) (ManagedConn, error) {
+
+ handle, err := p.pool.Get(network + " " + address)
+ if err != nil {
+ return nil, err
+ }
+ return NewManagedConn(network, address, handle, p, p.options), nil
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) Release(conn ManagedConn) error {
+ return conn.ReleaseConnection()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
+ return conn.DiscardConnection()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) EnterLameDuckMode() {
+ p.pool.EnterLameDuckMode()
+}
diff --git a/weed/wdclient/net2/connection_pool.go b/weed/wdclient/net2/connection_pool.go
new file mode 100644
index 000000000..5b8d4d232
--- /dev/null
+++ b/weed/wdclient/net2/connection_pool.go
@@ -0,0 +1,97 @@
+package net2
+
+import (
+ "net"
+ "time"
+)
+
+type ConnectionOptions struct {
+ // The maximum number of connections that can be active per host at any
+ // given time (A non-positive value indicates the number of connections
+ // is unbounded).
+ MaxActiveConnections int32
+
+ // The maximum number of idle connections per host that are kept alive by
+ // the connection pool.
+ MaxIdleConnections uint32
+
+ // The maximum amount of time an idle connection can alive (if specified).
+ MaxIdleTime *time.Duration
+
+ // This limits the number of concurrent Dial calls (there's no limit when
+ // DialMaxConcurrency is non-positive).
+ DialMaxConcurrency int
+
+ // Dial specifies the dial function for creating network connections.
+ // If Dial is nil, net.DialTimeout is used, with timeout set to 1 second.
+ Dial func(network string, address string) (net.Conn, error)
+
+ // This specifies the now time function. When the function is non-nil, the
+ // connection pool will use the specified function instead of time.Now to
+ // generate the current time.
+ NowFunc func() time.Time
+
+ // This specifies the timeout for any Read() operation.
+ // Note that setting this to 0 (i.e. not setting it) will make
+ // read operations block indefinitely.
+ ReadTimeout time.Duration
+
+ // This specifies the timeout for any Write() operation.
+ // Note that setting this to 0 (i.e. not setting it) will make
+ // write operations block indefinitely.
+ WriteTimeout time.Duration
+}
+
+func (o ConnectionOptions) getCurrentTime() time.Time {
+ if o.NowFunc == nil {
+ return time.Now()
+ } else {
+ return o.NowFunc()
+ }
+}
+
+// A generic interface for managed connection pool. All connection pool
+// implementations must be threadsafe.
+type ConnectionPool interface {
+ // This returns the number of active connections that are on loan.
+ NumActive() int32
+
+ // This returns the highest number of active connections for the entire
+ // lifetime of the pool.
+ ActiveHighWaterMark() int32
+
+ // This returns the number of idle connections that are in the pool.
+ NumIdle() int
+
+ // This associates (network, address) to the connection pool; afterwhich,
+ // the user can get connections to (network, address).
+ Register(network string, address string) error
+
+ // This dissociate (network, address) from the connection pool;
+ // afterwhich, the user can no longer get connections to
+ // (network, address).
+ Unregister(network string, address string) error
+
+ // This returns the list of registered (network, address) entries.
+ ListRegistered() []NetworkAddress
+
+ // This gets an active connection from the connection pool. The connection
+ // will remain active until one of the following is called:
+ // 1. conn.ReleaseConnection()
+ // 2. conn.DiscardConnection()
+ // 3. pool.Release(conn)
+ // 4. pool.Discard(conn)
+ Get(network string, address string) (ManagedConn, error)
+
+ // This releases an active connection back to the connection pool.
+ Release(conn ManagedConn) error
+
+ // This discards an active connection from the connection pool.
+ Discard(conn ManagedConn) error
+
+ // Enter the connection pool into lame duck mode. The connection pool
+ // will no longer return connections, and all idle connections are closed
+ // immediately (including active connections that are released back to the
+ // pool afterward).
+ EnterLameDuckMode()
+}
diff --git a/weed/wdclient/net2/doc.go b/weed/wdclient/net2/doc.go
new file mode 100644
index 000000000..f4d6552e4
--- /dev/null
+++ b/weed/wdclient/net2/doc.go
@@ -0,0 +1,6 @@
+// net2 is a collection of functions meant to supplement the capabilities
+// provided by the standard "net" package.
+package net2
+
+// copied from https://github.com/dropbox/godropbox/tree/master/net2
+// removed other dependencies \ No newline at end of file
diff --git a/weed/wdclient/net2/ip.go b/weed/wdclient/net2/ip.go
new file mode 100644
index 000000000..60e46342f
--- /dev/null
+++ b/weed/wdclient/net2/ip.go
@@ -0,0 +1,177 @@
+package net2
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "strings"
+ "sync"
+)
+
+var myHostname string
+var myHostnameOnce sync.Once
+
+// Like os.Hostname but caches first successful result, making it cheap to call it
+// over and over.
+// It will also crash whole process if fetching Hostname fails!
+func MyHostname() string {
+ myHostnameOnce.Do(func() {
+ var err error
+ myHostname, err = os.Hostname()
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myHostname
+}
+
+var myIp4 *net.IPAddr
+var myIp4Once sync.Once
+
+// Resolves `MyHostname()` to an Ip4 address. Caches first successful result, making it
+// cheap to call it over and over.
+// It will also crash whole process if resolving the IP fails!
+func MyIp4() *net.IPAddr {
+ myIp4Once.Do(func() {
+ var err error
+ myIp4, err = net.ResolveIPAddr("ip4", MyHostname())
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myIp4
+}
+
+var myIp6 *net.IPAddr
+var myIp6Once sync.Once
+
+// Resolves `MyHostname()` to an Ip6 address. Caches first successful result, making it
+// cheap to call it over and over.
+// It will also crash whole process if resolving the IP fails!
+func MyIp6() *net.IPAddr {
+ myIp6Once.Do(func() {
+ var err error
+ myIp6, err = net.ResolveIPAddr("ip6", MyHostname())
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myIp6
+}
+
+// This returns the list of local ip addresses which other hosts can connect
+// to (NOTE: Loopback ip is ignored).
+// Also resolves Hostname to an address and adds it to the list too, so
+// IPs from /etc/hosts can work too.
+func GetLocalIPs() ([]*net.IP, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, fmt.Errorf("Failed to lookup hostname: %v", err)
+ }
+ // Resolves IP Address from Hostname, this way overrides in /etc/hosts
+ // can work too for IP resolution.
+ ipInfo, err := net.ResolveIPAddr("ip4", hostname)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to resolve ip: %v", err)
+ }
+ ips := []*net.IP{&ipInfo.IP}
+
+ // TODO(zviad): Is rest of the code really necessary?
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil, fmt.Errorf( "Failed to get interface addresses: %v", err)
+ }
+ for _, addr := range addrs {
+ ipnet, ok := addr.(*net.IPNet)
+ if !ok {
+ continue
+ }
+
+ if ipnet.IP.IsLoopback() {
+ continue
+ }
+
+ ips = append(ips, &ipnet.IP)
+ }
+ return ips, nil
+}
+
+var localhostIPNets []*net.IPNet
+
+func init() {
+ for _, mask := range []string{"127.0.0.1/8", "::1/128"} {
+ _, ipnet, err := net.ParseCIDR(mask)
+ if err != nil {
+ panic(err)
+ }
+ localhostIPNets = append(localhostIPNets, ipnet)
+ }
+}
+
+func IsLocalhostIp(ipStr string) bool {
+ ip := net.ParseIP(ipStr)
+ if ip == nil {
+ return false
+ }
+ for _, ipnet := range localhostIPNets {
+ if ipnet.Contains(ip) {
+ return true
+ }
+ }
+ return false
+}
+
+// Given a host string, return true if the host is an ip (v4/v6) localhost.
+func IsLocalhost(host string) bool {
+ return IsLocalhostIp(host) ||
+ host == "localhost" ||
+ host == "ip6-localhost" ||
+ host == "ipv6-localhost"
+}
+
+// Resolves hostnames in addresses to actual IP4 addresses. Skips all invalid addresses
+// and all addresses that can't be resolved.
+// `addrs` are assumed to be of form: ["<hostname>:<port>", ...]
+// Returns an error in addition to resolved addresses if not all resolutions succeed.
+func ResolveIP4s(addrs []string) ([]string, error) {
+ resolvedAddrs := make([]string, 0, len(addrs))
+ var lastErr error
+
+ for _, server := range addrs {
+ hostPort := strings.Split(server, ":")
+ if len(hostPort) != 2 {
+ lastErr = fmt.Errorf("Skipping invalid address: %s", server)
+ continue
+ }
+
+ ip, err := net.ResolveIPAddr("ip4", hostPort[0])
+ if err != nil {
+ lastErr = err
+ continue
+ }
+ resolvedAddrs = append(resolvedAddrs, ip.IP.String()+":"+hostPort[1])
+ }
+ return resolvedAddrs, lastErr
+}
+
+func LookupValidAddrs() (map[string]bool, error) {
+ hostName, err := os.Hostname()
+ if err != nil {
+ return nil, err
+ }
+ addrs, err := net.LookupHost(hostName)
+ if err != nil {
+ return nil, err
+ }
+ validAddrs := make(map[string]bool)
+ validAddrs[hostName] = true
+ for _, addr := range addrs {
+ validAddrs[addr] = true
+ }
+ // Special case localhost/127.0.0.1 so that this works on devVMs. It should
+ // have no affect in production.
+ validAddrs["127.0.0.1"] = true
+ validAddrs["localhost"] = true
+ return validAddrs, nil
+}
diff --git a/weed/wdclient/net2/managed_connection.go b/weed/wdclient/net2/managed_connection.go
new file mode 100644
index 000000000..a886210d1
--- /dev/null
+++ b/weed/wdclient/net2/managed_connection.go
@@ -0,0 +1,185 @@
+package net2
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "errors"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
+)
+
+// Dial's arguments.
+type NetworkAddress struct {
+ Network string
+ Address string
+}
+
+// A connection managed by a connection pool. NOTE: SetDeadline,
+// SetReadDeadline and SetWriteDeadline are disabled for managed connections.
+// (The deadlines are set by the connection pool).
+type ManagedConn interface {
+ net.Conn
+
+ // This returns the original (network, address) entry used for creating
+ // the connection.
+ Key() NetworkAddress
+
+ // This returns the underlying net.Conn implementation.
+ RawConn() net.Conn
+
+ // This returns the connection pool which owns this connection.
+ Owner() ConnectionPool
+
+ // This indictes a user is done with the connection and releases the
+ // connection back to the connection pool.
+ ReleaseConnection() error
+
+ // This indicates the connection is an invalid state, and that the
+ // connection should be discarded from the connection pool.
+ DiscardConnection() error
+}
+
+// A physical implementation of ManagedConn
+type managedConnImpl struct {
+ addr NetworkAddress
+ handle resource_pool.ManagedHandle
+ pool ConnectionPool
+ options ConnectionOptions
+}
+
+// This creates a managed connection wrapper.
+func NewManagedConn(
+ network string,
+ address string,
+ handle resource_pool.ManagedHandle,
+ pool ConnectionPool,
+ options ConnectionOptions) ManagedConn {
+
+ addr := NetworkAddress{
+ Network: network,
+ Address: address,
+ }
+
+ return &managedConnImpl{
+ addr: addr,
+ handle: handle,
+ pool: pool,
+ options: options,
+ }
+}
+
+func (c *managedConnImpl) rawConn() (net.Conn, error) {
+ h, err := c.handle.Handle()
+ return h.(net.Conn), err
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) RawConn() net.Conn {
+ h, _ := c.handle.Handle()
+ return h.(net.Conn)
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) Key() NetworkAddress {
+ return c.addr
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) Owner() ConnectionPool {
+ return c.pool
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) ReleaseConnection() error {
+ return c.handle.Release()
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) DiscardConnection() error {
+ return c.handle.Discard()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Read(b []byte) (n int, err error) {
+ conn, err := c.rawConn()
+ if err != nil {
+ return 0, err
+ }
+
+ if c.options.ReadTimeout > 0 {
+ deadline := c.options.getCurrentTime().Add(c.options.ReadTimeout)
+ _ = conn.SetReadDeadline(deadline)
+ }
+ n, err = conn.Read(b)
+ if err != nil {
+ var localAddr string
+ if conn.LocalAddr() != nil {
+ localAddr = conn.LocalAddr().String()
+ } else {
+ localAddr = "(nil)"
+ }
+
+ var remoteAddr string
+ if conn.RemoteAddr() != nil {
+ remoteAddr = conn.RemoteAddr().String()
+ } else {
+ remoteAddr = "(nil)"
+ }
+ err = fmt.Errorf("Read error from host: %s <-> %s: %v", localAddr, remoteAddr, err)
+ }
+ return
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Write(b []byte) (n int, err error) {
+ conn, err := c.rawConn()
+ if err != nil {
+ return 0, err
+ }
+
+ if c.options.WriteTimeout > 0 {
+ deadline := c.options.getCurrentTime().Add(c.options.WriteTimeout)
+ _ = conn.SetWriteDeadline(deadline)
+ }
+ n, err = conn.Write(b)
+ if err != nil {
+ err = fmt.Errorf("Write error: %v", err)
+ }
+ return
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Close() error {
+ return c.handle.Discard()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) LocalAddr() net.Addr {
+ conn, _ := c.rawConn()
+ return conn.LocalAddr()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) RemoteAddr() net.Addr {
+ conn, _ := c.rawConn()
+ return conn.RemoteAddr()
+}
+
+// SetDeadline is disabled for managed connection (The deadline is set by
+// us, with respect to the read/write timeouts specified in ConnectionOptions).
+func (c *managedConnImpl) SetDeadline(t time.Time) error {
+ return errors.New("Cannot set deadline for managed connection")
+}
+
+// SetReadDeadline is disabled for managed connection (The deadline is set by
+// us with respect to the read timeout specified in ConnectionOptions).
+func (c *managedConnImpl) SetReadDeadline(t time.Time) error {
+ return errors.New("Cannot set read deadline for managed connection")
+}
+
+// SetWriteDeadline is disabled for managed connection (The deadline is set by
+// us with respect to the write timeout specified in ConnectionOptions).
+func (c *managedConnImpl) SetWriteDeadline(t time.Time) error {
+ return errors.New("Cannot set write deadline for managed connection")
+}
diff --git a/weed/wdclient/net2/port.go b/weed/wdclient/net2/port.go
new file mode 100644
index 000000000..f83adba28
--- /dev/null
+++ b/weed/wdclient/net2/port.go
@@ -0,0 +1,19 @@
+package net2
+
+import (
+ "net"
+ "strconv"
+)
+
+// Returns the port information.
+func GetPort(addr net.Addr) (int, error) {
+ _, lport, err := net.SplitHostPort(addr.String())
+ if err != nil {
+ return -1, err
+ }
+ lportInt, err := strconv.Atoi(lport)
+ if err != nil {
+ return -1, err
+ }
+ return lportInt, nil
+}
diff --git a/weed/wdclient/resource_pool/doc.go b/weed/wdclient/resource_pool/doc.go
new file mode 100644
index 000000000..c17b17c6c
--- /dev/null
+++ b/weed/wdclient/resource_pool/doc.go
@@ -0,0 +1,5 @@
+// A generic resource pool for managing resources such as network connections.
+package resource_pool
+
+// copied from https://github.com/dropbox/godropbox/tree/master/resource_pool
+// removed other dependencies \ No newline at end of file
diff --git a/weed/wdclient/resource_pool/managed_handle.go b/weed/wdclient/resource_pool/managed_handle.go
new file mode 100644
index 000000000..e1d82ca7b
--- /dev/null
+++ b/weed/wdclient/resource_pool/managed_handle.go
@@ -0,0 +1,97 @@
+package resource_pool
+
+import (
+ "sync/atomic"
+
+ "errors"
+)
+
+// A resource handle managed by a resource pool.
+type ManagedHandle interface {
+ // This returns the handle's resource location.
+ ResourceLocation() string
+
+ // This returns the underlying resource handle (or error if the handle
+ // is no longer active).
+ Handle() (interface{}, error)
+
+ // This returns the resource pool which owns this handle.
+ Owner() ResourcePool
+
+ // The releases the underlying resource handle to the caller and marks the
+ // managed handle as inactive. The caller is responsible for cleaning up
+ // the released handle. This returns nil if the managed handle no longer
+ // owns the resource.
+ ReleaseUnderlyingHandle() interface{}
+
+ // This indictes a user is done with the handle and releases the handle
+ // back to the resource pool.
+ Release() error
+
+ // This indicates the handle is an invalid state, and that the
+ // connection should be discarded from the connection pool.
+ Discard() error
+}
+
+// A physical implementation of ManagedHandle
+type managedHandleImpl struct {
+ location string
+ handle interface{}
+ pool ResourcePool
+ isActive int32 // atomic bool
+ options Options
+}
+
+// This creates a managed handle wrapper.
+func NewManagedHandle(
+ resourceLocation string,
+ handle interface{},
+ pool ResourcePool,
+ options Options) ManagedHandle {
+
+ h := &managedHandleImpl{
+ location: resourceLocation,
+ handle: handle,
+ pool: pool,
+ options: options,
+ }
+ atomic.StoreInt32(&h.isActive, 1)
+
+ return h
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) ResourceLocation() string {
+ return c.location
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Handle() (interface{}, error) {
+ if atomic.LoadInt32(&c.isActive) == 0 {
+ return c.handle, errors.New("Resource handle is no longer valid")
+ }
+ return c.handle, nil
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Owner() ResourcePool {
+ return c.pool
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) ReleaseUnderlyingHandle() interface{} {
+ if atomic.CompareAndSwapInt32(&c.isActive, 1, 0) {
+ return c.handle
+ }
+ return nil
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Release() error {
+ return c.pool.Release(c)
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Discard() error {
+ return c.pool.Discard(c)
+}
diff --git a/weed/wdclient/resource_pool/multi_resource_pool.go b/weed/wdclient/resource_pool/multi_resource_pool.go
new file mode 100644
index 000000000..9ac25526d
--- /dev/null
+++ b/weed/wdclient/resource_pool/multi_resource_pool.go
@@ -0,0 +1,200 @@
+package resource_pool
+
+import (
+ "fmt"
+ "sync"
+
+ "errors"
+)
+
+// A resource pool implementation that manages multiple resource location
+// entries. The handles to each resource location entry acts independently.
+// For example "tcp localhost:11211" could act as memcache
+// shard 0 and "tcp localhost:11212" could act as memcache shard 1.
+type multiResourcePool struct {
+ options Options
+
+ createPool func(Options) ResourcePool
+
+ rwMutex sync.RWMutex
+ isLameDuck bool // guarded by rwMutex
+ // NOTE: the locationPools is guarded by rwMutex, but the pool entries
+ // are not.
+ locationPools map[string]ResourcePool
+}
+
+// This returns a MultiResourcePool, which manages multiple
+// resource location entries. The handles to each resource location
+// entry acts independently.
+//
+// When createPool is nil, NewSimpleResourcePool is used as default.
+func NewMultiResourcePool(
+ options Options,
+ createPool func(Options) ResourcePool) ResourcePool {
+
+ if createPool == nil {
+ createPool = NewSimpleResourcePool
+ }
+
+ return &multiResourcePool{
+ options: options,
+ createPool: createPool,
+ rwMutex: sync.RWMutex{},
+ isLameDuck: false,
+ locationPools: make(map[string]ResourcePool),
+ }
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) NumActive() int32 {
+ total := int32(0)
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ total += pool.NumActive()
+ }
+ return total
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) ActiveHighWaterMark() int32 {
+ high := int32(0)
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ val := pool.ActiveHighWaterMark()
+ if val > high {
+ high = val
+ }
+ }
+ return high
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) NumIdle() int {
+ total := 0
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ total += pool.NumIdle()
+ }
+ return total
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Register(resourceLocation string) error {
+ if resourceLocation == "" {
+ return errors.New("Registering invalid resource location")
+ }
+
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ if p.isLameDuck {
+ return fmt.Errorf(
+ "Cannot register %s to lame duck resource pool",
+ resourceLocation)
+ }
+
+ if _, inMap := p.locationPools[resourceLocation]; inMap {
+ return nil
+ }
+
+ pool := p.createPool(p.options)
+ if err := pool.Register(resourceLocation); err != nil {
+ return err
+ }
+
+ p.locationPools[resourceLocation] = pool
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Unregister(resourceLocation string) error {
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ if pool, inMap := p.locationPools[resourceLocation]; inMap {
+ _ = pool.Unregister("")
+ pool.EnterLameDuckMode()
+ delete(p.locationPools, resourceLocation)
+ }
+ return nil
+}
+
+func (p *multiResourcePool) ListRegistered() []string {
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ result := make([]string, 0, len(p.locationPools))
+ for key, _ := range p.locationPools {
+ result = append(result, key)
+ }
+
+ return result
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Get(
+ resourceLocation string) (ManagedHandle, error) {
+
+ pool := p.getPool(resourceLocation)
+ if pool == nil {
+ return nil, fmt.Errorf(
+ "%s is not registered in the resource pool",
+ resourceLocation)
+ }
+ return pool.Get(resourceLocation)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Release(handle ManagedHandle) error {
+ pool := p.getPool(handle.ResourceLocation())
+ if pool == nil {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ return pool.Release(handle)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Discard(handle ManagedHandle) error {
+ pool := p.getPool(handle.ResourceLocation())
+ if pool == nil {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ return pool.Discard(handle)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) EnterLameDuckMode() {
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ p.isLameDuck = true
+
+ for _, pool := range p.locationPools {
+ pool.EnterLameDuckMode()
+ }
+}
+
+func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool {
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ if pool, inMap := p.locationPools[resourceLocation]; inMap {
+ return pool
+ }
+ return nil
+}
diff --git a/weed/wdclient/resource_pool/resource_pool.go b/weed/wdclient/resource_pool/resource_pool.go
new file mode 100644
index 000000000..26c433f50
--- /dev/null
+++ b/weed/wdclient/resource_pool/resource_pool.go
@@ -0,0 +1,96 @@
+package resource_pool
+
+import (
+ "time"
+)
+
+type Options struct {
+ // The maximum number of active resource handles per resource location. (A
+ // non-positive value indicates the number of active resource handles is
+ // unbounded).
+ MaxActiveHandles int32
+
+ // The maximum number of idle resource handles per resource location that
+ // are kept alive by the resource pool.
+ MaxIdleHandles uint32
+
+ // The maximum amount of time an idle resource handle can remain alive (if
+ // specified).
+ MaxIdleTime *time.Duration
+
+ // This limits the number of concurrent Open calls (there's no limit when
+ // OpenMaxConcurrency is non-positive).
+ OpenMaxConcurrency int
+
+ // This function creates a resource handle (e.g., a connection) for a
+ // resource location. The function must be thread-safe.
+ Open func(resourceLocation string) (
+ handle interface{},
+ err error)
+
+ // This function destroys a resource handle and performs the necessary
+ // cleanup to free up resources. The function must be thread-safe.
+ Close func(handle interface{}) error
+
+ // This specifies the now time function. When the function is non-nil, the
+ // resource pool will use the specified function instead of time.Now to
+ // generate the current time.
+ NowFunc func() time.Time
+}
+
+func (o Options) getCurrentTime() time.Time {
+ if o.NowFunc == nil {
+ return time.Now()
+ } else {
+ return o.NowFunc()
+ }
+}
+
+// A generic interface for managed resource pool. All resource pool
+// implementations must be threadsafe.
+type ResourcePool interface {
+ // This returns the number of active resource handles.
+ NumActive() int32
+
+ // This returns the highest number of actives handles for the entire
+ // lifetime of the pool. If the pool contains multiple sub-pools, the
+ // high water mark is the max of the sub-pools' high water marks.
+ ActiveHighWaterMark() int32
+
+ // This returns the number of alive idle handles. NOTE: This is only used
+ // for testing.
+ NumIdle() int
+
+ // This associates a resource location to the resource pool; afterwhich,
+ // the user can get resource handles for the resource location.
+ Register(resourceLocation string) error
+
+ // This dissociates a resource location from the resource pool; afterwhich,
+ // the user can no longer get resource handles for the resource location.
+ // If the given resource location corresponds to a sub-pool, the unregistered
+ // sub-pool will enter lame duck mode.
+ Unregister(resourceLocation string) error
+
+ // This returns the list of registered resource location entries.
+ ListRegistered() []string
+
+ // This gets an active resource handle from the resource pool. The
+ // handle will remain active until one of the following is called:
+ // 1. handle.Release()
+ // 2. handle.Discard()
+ // 3. pool.Release(handle)
+ // 4. pool.Discard(handle)
+ Get(key string) (ManagedHandle, error)
+
+ // This releases an active resource handle back to the resource pool.
+ Release(handle ManagedHandle) error
+
+ // This discards an active resource from the resource pool.
+ Discard(handle ManagedHandle) error
+
+ // Enter the resource pool into lame duck mode. The resource pool
+ // will no longer return resource handles, and all idle resource handles
+ // are closed immediately (including active resource handles that are
+ // released back to the pool afterward).
+ EnterLameDuckMode()
+}
diff --git a/weed/wdclient/resource_pool/semaphore.go b/weed/wdclient/resource_pool/semaphore.go
new file mode 100644
index 000000000..ff35d5bc5
--- /dev/null
+++ b/weed/wdclient/resource_pool/semaphore.go
@@ -0,0 +1,154 @@
+package resource_pool
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type Semaphore interface {
+ // Increment the semaphore counter by one.
+ Release()
+
+ // Decrement the semaphore counter by one, and block if counter < 0
+ Acquire()
+
+ // Decrement the semaphore counter by one, and block if counter < 0
+ // Wait for up to the given duration. Returns true if did not timeout
+ TryAcquire(timeout time.Duration) bool
+}
+
+// A simple counting Semaphore.
+type boundedSemaphore struct {
+ slots chan struct{}
+}
+
+// Create a bounded semaphore. The count parameter must be a positive number.
+// NOTE: The bounded semaphore will panic if the user tries to Release
+// beyond the specified count.
+func NewBoundedSemaphore(count uint) Semaphore {
+ sem := &boundedSemaphore{
+ slots: make(chan struct{}, int(count)),
+ }
+ for i := 0; i < cap(sem.slots); i++ {
+ sem.slots <- struct{}{}
+ }
+ return sem
+}
+
+// Acquire returns on successful acquisition.
+func (sem *boundedSemaphore) Acquire() {
+ <-sem.slots
+}
+
+// TryAcquire returns true if it acquires a resource slot within the
+// timeout, false otherwise.
+func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool {
+ if timeout > 0 {
+ // Wait until we get a slot or timeout expires.
+ tm := time.NewTimer(timeout)
+ defer tm.Stop()
+ select {
+ case <-sem.slots:
+ return true
+ case <-tm.C:
+ // Timeout expired. In very rare cases this might happen even if
+ // there is a slot available, e.g. GC pause after we create the timer
+ // and select randomly picked this one out of the two available channels.
+ // We should do one final immediate check below.
+ }
+ }
+
+ // Return true if we have a slot available immediately and false otherwise.
+ select {
+ case <-sem.slots:
+ return true
+ default:
+ return false
+ }
+}
+
+// Release the acquired semaphore. You must not release more than you
+// have acquired.
+func (sem *boundedSemaphore) Release() {
+ select {
+ case sem.slots <- struct{}{}:
+ default:
+ // slots is buffered. If a send blocks, it indicates a programming
+ // error.
+ panic(fmt.Errorf("too many releases for boundedSemaphore"))
+ }
+}
+
+// This returns an unbound counting semaphore with the specified initial count.
+// The semaphore counter can be arbitrary large (i.e., Release can be called
+// unlimited amount of times).
+//
+// NOTE: In general, users should use bounded semaphore since it is more
+// efficient than unbounded semaphore.
+func NewUnboundedSemaphore(initialCount int) Semaphore {
+ res := &unboundedSemaphore{
+ counter: int64(initialCount),
+ }
+ res.cond.L = &res.lock
+ return res
+}
+
+type unboundedSemaphore struct {
+ lock sync.Mutex
+ cond sync.Cond
+ counter int64
+}
+
+func (s *unboundedSemaphore) Release() {
+ s.lock.Lock()
+ s.counter += 1
+ if s.counter > 0 {
+ // Not broadcasting here since it's unlike we can satify all waiting
+ // goroutines. Instead, we will Signal again if there are left over
+ // quota after Acquire, in case of lost wakeups.
+ s.cond.Signal()
+ }
+ s.lock.Unlock()
+}
+
+func (s *unboundedSemaphore) Acquire() {
+ s.lock.Lock()
+ for s.counter < 1 {
+ s.cond.Wait()
+ }
+ s.counter -= 1
+ if s.counter > 0 {
+ s.cond.Signal()
+ }
+ s.lock.Unlock()
+}
+
+func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool {
+ done := make(chan bool, 1)
+ // Gate used to communicate between the threads and decide what the result
+ // is. If the main thread decides, we have timed out, otherwise we succeed.
+ decided := new(int32)
+ atomic.StoreInt32(decided, 0)
+ go func() {
+ s.Acquire()
+ if atomic.SwapInt32(decided, 1) == 0 {
+ // Acquire won the race
+ done <- true
+ } else {
+ // If we already decided the result, and this thread did not win
+ s.Release()
+ }
+ }()
+ select {
+ case <-done:
+ return true
+ case <-time.After(timeout):
+ if atomic.SwapInt32(decided, 1) == 1 {
+ // The other thread already decided the result
+ return true
+ }
+ return false
+ }
+}
diff --git a/weed/wdclient/resource_pool/simple_resource_pool.go b/weed/wdclient/resource_pool/simple_resource_pool.go
new file mode 100644
index 000000000..b0c539100
--- /dev/null
+++ b/weed/wdclient/resource_pool/simple_resource_pool.go
@@ -0,0 +1,343 @@
+package resource_pool
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type idleHandle struct {
+ handle interface{}
+ keepUntil *time.Time
+}
+
+type TooManyHandles struct {
+ location string
+}
+
+func (t TooManyHandles) Error() string {
+ return fmt.Sprintf("Too many handles to %s", t.location)
+}
+
+type OpenHandleError struct {
+ location string
+ err error
+}
+
+func (o OpenHandleError) Error() string {
+ return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err)
+}
+
+// A resource pool implementation where all handles are associated to the
+// same resource location.
+type simpleResourcePool struct {
+ options Options
+
+ numActive *int32 // atomic counter
+
+ activeHighWaterMark *int32 // atomic / monotonically increasing value
+
+ openTokens Semaphore
+
+ mutex sync.Mutex
+ location string // guard by mutex
+ idleHandles []*idleHandle // guarded by mutex
+ isLameDuck bool // guarded by mutex
+}
+
+// This returns a SimpleResourcePool, where all handles are associated to a
+// single resource location.
+func NewSimpleResourcePool(options Options) ResourcePool {
+ numActive := new(int32)
+ atomic.StoreInt32(numActive, 0)
+
+ activeHighWaterMark := new(int32)
+ atomic.StoreInt32(activeHighWaterMark, 0)
+
+ var tokens Semaphore
+ if options.OpenMaxConcurrency > 0 {
+ tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency))
+ }
+
+ return &simpleResourcePool{
+ location: "",
+ options: options,
+ numActive: numActive,
+ activeHighWaterMark: activeHighWaterMark,
+ openTokens: tokens,
+ mutex: sync.Mutex{},
+ idleHandles: make([]*idleHandle, 0, 0),
+ isLameDuck: false,
+ }
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) NumActive() int32 {
+ return atomic.LoadInt32(p.numActive)
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) ActiveHighWaterMark() int32 {
+ return atomic.LoadInt32(p.activeHighWaterMark)
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) NumIdle() int {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ return len(p.idleHandles)
+}
+
+// SimpleResourcePool can only register a single (network, address) entry.
+// Register should be call before any Get calls.
+func (p *simpleResourcePool) Register(resourceLocation string) error {
+ if resourceLocation == "" {
+ return errors.New("Invalid resource location")
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.isLameDuck {
+ return fmt.Errorf(
+ "cannot register %s to lame duck resource pool",
+ resourceLocation)
+ }
+
+ if p.location == "" {
+ p.location = resourceLocation
+ return nil
+ }
+ return errors.New("SimpleResourcePool can only register one location")
+}
+
+// SimpleResourcePool will enter lame duck mode upon calling Unregister.
+func (p *simpleResourcePool) Unregister(resourceLocation string) error {
+ p.EnterLameDuckMode()
+ return nil
+}
+
+func (p *simpleResourcePool) ListRegistered() []string {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.location != "" {
+ return []string{p.location}
+ }
+ return []string{}
+}
+
+func (p *simpleResourcePool) getLocation() (string, error) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.location == "" {
+ return "", fmt.Errorf(
+ "resource location is not set for SimpleResourcePool")
+ }
+
+ if p.isLameDuck {
+ return "", fmt.Errorf(
+ "lame duck resource pool cannot return handles to %s",
+ p.location)
+ }
+
+ return p.location, nil
+}
+
+// This gets an active resource from the resource pool. Note that the
+// resourceLocation argument is ignored (The handles are associated to the
+// resource location provided by the first Register call).
+func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) {
+ activeCount := atomic.AddInt32(p.numActive, 1)
+ if p.options.MaxActiveHandles > 0 &&
+ activeCount > p.options.MaxActiveHandles {
+
+ atomic.AddInt32(p.numActive, -1)
+ return nil, TooManyHandles{p.location}
+ }
+
+ highest := atomic.LoadInt32(p.activeHighWaterMark)
+ for activeCount > highest &&
+ !atomic.CompareAndSwapInt32(
+ p.activeHighWaterMark,
+ highest,
+ activeCount) {
+
+ highest = atomic.LoadInt32(p.activeHighWaterMark)
+ }
+
+ if h := p.getIdleHandle(); h != nil {
+ return h, nil
+ }
+
+ location, err := p.getLocation()
+ if err != nil {
+ atomic.AddInt32(p.numActive, -1)
+ return nil, err
+ }
+
+ if p.openTokens != nil {
+ // Current implementation does not wait for tokens to become available.
+ // If that causes availability hits, we could increase the wait,
+ // similar to simple_pool.go.
+ if p.openTokens.TryAcquire(0) {
+ defer p.openTokens.Release()
+ } else {
+ // We could not immediately acquire a token.
+ // Instead of waiting
+ atomic.AddInt32(p.numActive, -1)
+ return nil, OpenHandleError{
+ p.location, errors.New("Open Error: reached OpenMaxConcurrency")}
+ }
+ }
+
+ handle, err := p.options.Open(location)
+ if err != nil {
+ atomic.AddInt32(p.numActive, -1)
+ return nil, OpenHandleError{p.location, err}
+ }
+
+ return NewManagedHandle(p.location, handle, p, p.options), nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) Release(handle ManagedHandle) error {
+ if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ h := handle.ReleaseUnderlyingHandle()
+ if h != nil {
+ // We can unref either before or after queuing the idle handle.
+ // The advantage of unref-ing before queuing is that there is
+ // a higher chance of successful Get when number of active handles
+ // is close to the limit (but potentially more handle creation).
+ // The advantage of queuing before unref-ing is that there's a
+ // higher chance of reusing handle (but potentially more Get failures).
+ atomic.AddInt32(p.numActive, -1)
+ p.queueIdleHandles(h)
+ }
+
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) Discard(handle ManagedHandle) error {
+ if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ h := handle.ReleaseUnderlyingHandle()
+ if h != nil {
+ atomic.AddInt32(p.numActive, -1)
+ if err := p.options.Close(h); err != nil {
+ return fmt.Errorf("failed to close resource handle: %v", err)
+ }
+ }
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) EnterLameDuckMode() {
+ p.mutex.Lock()
+
+ toClose := p.idleHandles
+ p.isLameDuck = true
+ p.idleHandles = []*idleHandle{}
+
+ p.mutex.Unlock()
+
+ p.closeHandles(toClose)
+}
+
+// This returns an idle resource, if there is one.
+func (p *simpleResourcePool) getIdleHandle() ManagedHandle {
+ var toClose []*idleHandle
+ defer func() {
+ // NOTE: Must keep the closure around to late bind the toClose slice.
+ p.closeHandles(toClose)
+ }()
+
+ now := p.options.getCurrentTime()
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ var i int
+ for i = 0; i < len(p.idleHandles); i++ {
+ idle := p.idleHandles[i]
+ if idle.keepUntil == nil || now.Before(*idle.keepUntil) {
+ break
+ }
+ }
+ if i > 0 {
+ toClose = p.idleHandles[0:i]
+ }
+
+ if i < len(p.idleHandles) {
+ idle := p.idleHandles[i]
+ p.idleHandles = p.idleHandles[i+1:]
+ return NewManagedHandle(p.location, idle.handle, p, p.options)
+ }
+
+ if len(p.idleHandles) > 0 {
+ p.idleHandles = []*idleHandle{}
+ }
+ return nil
+}
+
+// This adds an idle resource to the pool.
+func (p *simpleResourcePool) queueIdleHandles(handle interface{}) {
+ var toClose []*idleHandle
+ defer func() {
+ // NOTE: Must keep the closure around to late bind the toClose slice.
+ p.closeHandles(toClose)
+ }()
+
+ now := p.options.getCurrentTime()
+ var keepUntil *time.Time
+ if p.options.MaxIdleTime != nil {
+ // NOTE: Assign to temp variable first to work around compiler bug
+ x := now.Add(*p.options.MaxIdleTime)
+ keepUntil = &x
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.isLameDuck {
+ toClose = []*idleHandle{
+ {handle: handle},
+ }
+ return
+ }
+
+ p.idleHandles = append(
+ p.idleHandles,
+ &idleHandle{
+ handle: handle,
+ keepUntil: keepUntil,
+ })
+
+ nIdleHandles := uint32(len(p.idleHandles))
+ if nIdleHandles > p.options.MaxIdleHandles {
+ handlesToClose := nIdleHandles - p.options.MaxIdleHandles
+ toClose = p.idleHandles[0:handlesToClose]
+ p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles]
+ }
+}
+
+// Closes resources, at this point it is assumed that this resources
+// are no longer referenced from the main idleHandles slice.
+func (p *simpleResourcePool) closeHandles(handles []*idleHandle) {
+ for _, handle := range handles {
+ _ = p.options.Close(handle.handle)
+ }
+}
diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go
new file mode 100644
index 000000000..afebd71eb
--- /dev/null
+++ b/weed/wdclient/volume_tcp_client.go
@@ -0,0 +1,97 @@
+package wdclient
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/net2"
+ "io"
+ "net"
+ "time"
+)
+
+// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
+type VolumeTcpClient struct {
+ cp net2.ConnectionPool
+}
+
+type VolumeTcpConn struct {
+ net.Conn
+ bufWriter *bufio.Writer
+ bufReader *bufio.Reader
+}
+
+func NewVolumeTcpClient() *VolumeTcpClient {
+ MaxIdleTime := 10 * time.Second
+ return &VolumeTcpClient{
+ cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
+ MaxActiveConnections: 16,
+ MaxIdleConnections: 1,
+ MaxIdleTime: &MaxIdleTime,
+ DialMaxConcurrency: 0,
+ Dial: func(network string, address string) (net.Conn, error) {
+ conn, err := net.Dial(network, address)
+ return &VolumeTcpConn{
+ conn,
+ bufio.NewWriter(conn),
+ bufio.NewReader(conn),
+ }, err
+ },
+ NowFunc: nil,
+ ReadTimeout: 0,
+ WriteTimeout: 0,
+ }),
+ }
+}
+func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
+
+ tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000)
+ if parseErr != nil {
+ return parseErr
+ }
+
+ c.cp.Register("tcp", tcpAddress)
+ tcpConn, getErr := c.cp.Get("tcp", tcpAddress)
+ if getErr != nil {
+ return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr)
+ }
+ conn := tcpConn.RawConn().(*VolumeTcpConn)
+ defer func() {
+ if err != nil {
+ tcpConn.DiscardConnection()
+ } else {
+ tcpConn.ReleaseConnection()
+ }
+ }()
+
+ buf := []byte("+" + fileId + "\n")
+ _, err = conn.bufWriter.Write([]byte(buf))
+ if err != nil {
+ return
+ }
+ util.Uint32toBytes(buf[0:4], fileSize)
+ _, err = conn.bufWriter.Write(buf[0:4])
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(conn.bufWriter, fileReader)
+ if err != nil {
+ return
+ }
+ conn.bufWriter.Write([]byte("!\n"))
+ conn.bufWriter.Flush()
+
+ ret, _, err := conn.bufReader.ReadLine()
+ if err != nil {
+ glog.V(0).Infof("upload by tcp: %v", err)
+ return
+ }
+ if !bytes.HasPrefix(ret, []byte("+OK")) {
+ glog.V(0).Infof("upload by tcp: %v", string(ret))
+ }
+
+ return nil
+}