aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--k8s/seaweedfs/Chart.yaml2
-rw-r--r--k8s/seaweedfs/values.yaml2
-rw-r--r--weed/Makefile4
-rw-r--r--weed/command/filer.go5
-rw-r--r--weed/command/master.go2
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go3
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go16
-rw-r--r--weed/filer/entry.go2
-rw-r--r--weed/filer/filer.go28
-rw-r--r--weed/filer/filer_conf.go20
-rw-r--r--weed/filer/filer_delete_entry.go29
-rw-r--r--weed/filer/filer_on_meta_event.go4
-rw-r--r--weed/filer/filer_rename.go30
-rw-r--r--weed/filer/filerstore.go11
-rw-r--r--weed/filer/filerstore_hardlink.go5
-rw-r--r--weed/filer/read_write.go112
-rw-r--r--weed/filer/reader_at.go13
-rw-r--r--weed/filer/s3iam_conf.go25
-rw-r--r--weed/filer/s3iam_conf_test.go (renamed from weed/s3iam/s3iam_filer_store_test.go)20
-rw-r--r--weed/filesys/dir_rename.go2
-rw-r--r--weed/filesys/filehandle.go4
-rw-r--r--weed/pb/filer_pb/filer_client.go111
-rw-r--r--weed/s3api/auth_credentials.go65
-rw-r--r--weed/s3api/auth_credentials_subscribe.go70
-rw-r--r--weed/s3api/auth_credentials_test.go1
-rw-r--r--weed/s3api/s3_constants/s3_actions.go10
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go61
-rw-r--r--weed/s3api/s3api_server.go5
-rw-r--r--weed/s3iam/s3iam_filer_store.go95
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_grpc_server_rename.go11
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go9
-rw-r--r--weed/server/filer_server.go1
-rw-r--r--weed/server/filer_server_handlers_read.go10
-rw-r--r--weed/server/filer_server_handlers_write.go9
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go3
-rw-r--r--weed/server/master_server.go8
-rw-r--r--weed/shell/command_fs_configure.go38
-rw-r--r--weed/shell/command_s3_configure.go39
-rw-r--r--weed/storage/needle/volume_ttl.go37
-rw-r--r--weed/storage/needle/volume_ttl_test.go2
-rw-r--r--weed/storage/store.go3
-rw-r--r--weed/storage/volume.go4
-rw-r--r--weed/storage/volume_loading.go18
-rw-r--r--weed/storage/volume_read_write.go2
-rw-r--r--weed/util/config.go1
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/network.go22
51 files changed, 630 insertions, 353 deletions
diff --git a/go.mod b/go.mod
index ab3da6f20..1ecbfd2a9 100644
--- a/go.mod
+++ b/go.mod
@@ -58,7 +58,7 @@ require (
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
- github.com/seaweedfs/fuse v1.0.7
+ github.com/seaweedfs/fuse v1.0.8
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spaolacci/murmur3 v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index 51d96d997..387d2a8ac 100644
--- a/go.sum
+++ b/go.sum
@@ -549,6 +549,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seaweedfs/fuse v1.0.7 h1:tESMXhI3gXzN+dlWsCUrkIZDiWA4dZX18rQMoqmvazw=
github.com/seaweedfs/fuse v1.0.7/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
+github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s=
+github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk=
github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw=
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml
index 236751f4c..ea3317e61 100644
--- a/k8s/seaweedfs/Chart.yaml
+++ b/k8s/seaweedfs/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-version: 2.13 \ No newline at end of file
+version: 2.14 \ No newline at end of file
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 3e5dd0dca..72bdb684a 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- imageTag: "2.13"
+ imageTag: "2.15"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/weed/Makefile b/weed/Makefile
index cedde7847..33886215a 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -29,3 +29,7 @@ debug_volume:
debug_webdav:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 webdav
+
+debug_s3:
+ go build -gcflags="all=-N -l"
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3 \ No newline at end of file
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 017427335..e72056893 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -42,6 +42,7 @@ type FilerOptions struct {
cipher *bool
peers *string
metricsHttpPort *int
+ cacheToFilerLimit *int
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -65,6 +66,7 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+ f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -89,7 +91,7 @@ var cmdFiler = &Command{
//return a json format subdirectory and files listing
GET /path/to/
- The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.
+ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
@@ -149,6 +151,7 @@ func (fo *FilerOptions) startFiler() {
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
+ CacheToFilerLimit: int64(*fo.cacheToFilerLimit),
Filers: peers,
})
if nfs_err != nil {
diff --git a/weed/command/master.go b/weed/command/master.go
index c03da7f5d..d569919cd 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -69,7 +69,7 @@ var cmdMaster = &Command{
Short: "start a master server",
Long: `start a master server to provide volume=>location mapping service and sequence number of file ids
- The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.
+ The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
The example security.toml configuration file can be generated by "weed scaffold -config=security"
diff --git a/weed/command/server.go b/weed/command/server.go
index 0c6731eb2..7e63f8e8a 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -94,6 +94,7 @@ func init() {
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
+ filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 7c95ffb57..da104358b 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -77,7 +77,8 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
}
if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
- return fmt.Errorf("kv insert: %s", err)
+ // return fmt.Errorf("insert: %s", err)
+ // skip this since the error can be in a different language
}
// now the insert failed possibly due to duplication constraints
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index c368059df..81d105134 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -5,11 +5,10 @@ import (
"database/sql"
"encoding/base64"
"fmt"
- "strings"
-
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
)
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
@@ -17,10 +16,13 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
dirStr, dirHash, name := genDirAndName(key)
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
- if err != nil {
- if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
- return fmt.Errorf("kv insert: %s", err)
- }
+ if err == nil {
+ return
+ }
+
+ if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
+ // return fmt.Errorf("kv insert: %s", err)
+ // skip this since the error can be in a different language
}
// now the insert failed possibly due to duplication constraints
@@ -32,7 +34,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
}
_, err = res.RowsAffected()
- if err != nil {
+ if err != nil {
return fmt.Errorf("kv upsert no rows affected: %s", err)
}
return nil
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index d2f257967..dbe10c9b1 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -44,7 +44,7 @@ type Entry struct {
}
func (entry *Entry) Size() uint64 {
- return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
+ return maxUint64(maxUint64(TotalSize(entry.Chunks), entry.FileSize), uint64(len(entry.Content)))
}
func (entry *Entry) Timestamp() time.Time {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 8319212f1..800dd35dc 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -18,7 +18,7 @@ import (
const (
LogFlushInterval = time.Minute
- PaginationSize = 1024 * 256
+ PaginationSize = 1024
FilerStoreId = "filer.store.id"
)
@@ -251,21 +251,23 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
return f.Store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
+var (
+ Root = &Entry{
+ FullPath: "/",
+ Attr: Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.ModeDir | 0755,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+)
- now := time.Now()
+func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
if string(p) == "/" {
- return &Entry{
- FullPath: p,
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0755,
- Uid: OS_UID,
- Gid: OS_GID,
- },
- }, nil
+ return Root, nil
}
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 0328fdbff..18ed37abd 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -9,13 +9,15 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/jsonpb"
- "github.com/golang/protobuf/proto"
"github.com/viant/ptrie"
)
const (
- DirectoryEtc = "/etc"
- FilerConfName = "filer.conf"
+ DirectoryEtcRoot = "/etc"
+ DirectoryEtcSeaweedFS = "/etc/seaweedfs"
+ FilerConfName = "filer.conf"
+ IamConfigDirecotry = "/etc/iam"
+ IamIdentityFile = "identity.json"
)
type FilerConf struct {
@@ -30,7 +32,7 @@ func NewFilerConf() (fc *FilerConf) {
}
func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) {
- filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName)
+ filerConfPath := util.NewFullPath(DirectoryEtcSeaweedFS, FilerConfName)
entry, err := filer.FindEntry(context.Background(), filerConfPath)
if err != nil {
if err == filer_pb.ErrNotFound {
@@ -61,15 +63,7 @@ func (fc *FilerConf) LoadFromBytes(data []byte) (err error) {
conf := &filer_pb.FilerConf{}
if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil {
-
- err = proto.UnmarshalText(string(data), conf)
- if err != nil {
- glog.Errorf("unable to parse filer conf: %v", err)
- // this is not recoverable
- return nil
- }
-
- return nil
+ return err
}
return fc.doLoadConf(conf)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index b4f4e46ff..da92c4f4b 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -3,8 +3,6 @@ package filer
import (
"context"
"fmt"
- "strings"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -60,11 +58,6 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
collectionName := entry.Name()
f.doDeleteCollection(collectionName)
f.deleteBucket(collectionName)
- } else {
- parent, _ := p.DirAndName()
- if err := f.removeEmptyParentFolder(ctx, util.FullPath(parent)); err != nil {
- glog.Errorf("clean up empty folders for %s : %v", p, err)
- }
}
return nil
@@ -159,25 +152,3 @@ func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
}
}
}
-
-func (f *Filer) removeEmptyParentFolder(ctx context.Context, dir util.FullPath) error {
- if !strings.HasPrefix(string(dir), f.DirBucketsPath) {
- return nil
- }
- parent, _ := dir.DirAndName()
- if parent == f.DirBucketsPath {
- // should not delete bucket itself
- return nil
- }
- entries, err := f.ListDirectoryEntries(ctx, dir, "", false, 1, "")
- if err != nil {
- return err
- }
- if len(entries) > 0 {
- return nil
- }
- if err := f.Store.DeleteEntry(ctx, dir); err != nil {
- return err
- }
- return f.removeEmptyParentFolder(ctx, util.FullPath(parent))
-}
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 3de27da6e..d93a7e9e3 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -11,8 +11,8 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
- if DirectoryEtc != event.Directory {
- if DirectoryEtc != event.EventNotification.NewParentPath {
+ if DirectoryEtcSeaweedFS != event.Directory {
+ if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {
return
}
}
diff --git a/weed/filer/filer_rename.go b/weed/filer/filer_rename.go
new file mode 100644
index 000000000..b6f0cf6de
--- /dev/null
+++ b/weed/filer/filer_rename.go
@@ -0,0 +1,30 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+)
+
+func (f *Filer) CanRename(source, target util.FullPath) error {
+ sourceBucket := f.DetectBucket(source)
+ targetBucket := f.DetectBucket(target)
+ if sourceBucket != targetBucket {
+ return fmt.Errorf("can not move across collection %s => %s", sourceBucket, targetBucket)
+ }
+ return nil
+}
+
+func (f *Filer) DetectBucket(source util.FullPath) (bucket string) {
+ if strings.HasPrefix(string(source), f.DirBucketsPath+"/") {
+ bucketAndObjectKey := string(source)[len(f.DirBucketsPath)+1:]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 {
+ bucket = bucketAndObjectKey
+ }
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ }
+ }
+ return bucket
+}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 3ad7a787e..4b28c3021 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"errors"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"strings"
"time"
@@ -85,6 +86,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err
return err
}
+ glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return fsw.ActualStore.InsertEntry(ctx, entry)
}
@@ -104,6 +106,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return err
}
+ glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return fsw.ActualStore.UpdateEntry(ctx, entry)
}
@@ -114,6 +117,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
+ glog.V(4).Infof("FindEntry %s", fp)
entry, err = fsw.ActualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
@@ -138,11 +142,13 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
}
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
+ glog.V(4).Infof("DeleteEntry %s", fp)
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
@@ -155,11 +161,13 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
if len(existingEntry.HardLinkId) != 0 {
// remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
}
+ glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
@@ -170,6 +178,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
+ glog.V(4).Infof("DeleteFolderChildren %s", fp)
return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
}
@@ -180,6 +189,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
+ glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
@@ -197,6 +207,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
+ glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err == ErrUnsupportedListDirectoryPrefixed {
entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
index 0fbf8310e..c6b3734b0 100644
--- a/weed/filer/filerstore_hardlink.go
+++ b/weed/filer/filerstore_hardlink.go
@@ -18,6 +18,7 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
}
// check what is existing entry
+ glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
@@ -25,6 +26,7 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
// remove old hard link
if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
+ glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
return err
}
@@ -52,6 +54,7 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
}
key := entry.HardLinkId
+ glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
value, err := fsw.KvGet(ctx, key)
if err != nil {
glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
@@ -83,6 +86,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
entry.HardLinkCounter--
if entry.HardLinkCounter <= 0 {
+ glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
return fsw.KvDelete(ctx, key)
}
@@ -91,6 +95,7 @@ func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId Har
return encodeErr
}
+ glog.V(4).Infof("DeleteHardLink KvPut %v", key)
return fsw.KvPut(ctx, key, newBlob)
}
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
new file mode 100644
index 000000000..548473116
--- /dev/null
+++ b/weed/filer/read_write.go
@@ -0,0 +1,112 @@
+package filer
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "io/ioutil"
+ "math"
+ "net/http"
+ "time"
+)
+
+func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(filerClient, request)
+ if err != nil {
+ return err
+ }
+ if len(respLookupEntry.Entry.Content) > 0 {
+ _, err = byteBuffer.Write(respLookupEntry.Entry.Content)
+ return err
+ }
+
+ return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+
+}
+
+func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
+
+ target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
+
+ data, _, err := util.Get(target)
+
+ return data, err
+}
+
+func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error {
+
+ target := fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
+
+ // set the HTTP method, url, and request body
+ req, err := http.NewRequest(http.MethodPut, target, byteBuffer)
+ if err != nil {
+ return err
+ }
+
+ // set the request header Content-Type for json
+ if contentType != "" {
+ req.Header.Set("Content-Type", contentType)
+ }
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer util.CloseResponse(resp)
+
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode >= 400 {
+ return fmt.Errorf("%s: %s %v", target, resp.Status, string(b))
+ }
+
+ return nil
+
+}
+
+func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
+ })
+
+ if err == filer_pb.ErrNotFound {
+ err = filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0644),
+ Collection: "",
+ Replication: "",
+ FileSize: uint64(len(content)),
+ },
+ Content: content,
+ },
+ })
+ } else if err == nil {
+ entry := resp.Entry
+ entry.Content = content
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileSize = uint64(len(content))
+ err = filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ }
+
+ return err
+} \ No newline at end of file
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index ccc746b90..6193dbd45 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -23,12 +23,13 @@ type ChunkReadAt struct {
fileSize int64
fetchGroup singleflight.Group
+ chunkCache chunk_cache.ChunkCache
lastChunkFileId string
lastChunkData []byte
- chunkCache chunk_cache.ChunkCache
}
-// var _ = io.ReaderAt(&ChunkReadAt{})
+var _ = io.ReaderAt(&ChunkReadAt{})
+var _ = io.Closer(&ChunkReadAt{})
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
@@ -96,6 +97,12 @@ func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*
}
}
+func (c *ChunkReadAt) Close() error {
+ c.lastChunkData = nil
+ c.lastChunkFileId = ""
+ return nil
+}
+
func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
@@ -107,7 +114,6 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
- var buffer []byte
startOffset, remaining := offset, int64(len(p))
var nextChunk *ChunkView
for i, chunk := range c.chunkViews {
@@ -134,6 +140,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
+ var buffer []byte
buffer, err = c.readFromWholeChunkData(chunk, nextChunk)
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
new file mode 100644
index 000000000..92387fb09
--- /dev/null
+++ b/weed/filer/s3iam_conf.go
@@ -0,0 +1,25 @@
+package filer
+
+import (
+ "bytes"
+ "github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
+ "github.com/golang/protobuf/jsonpb"
+ "io"
+)
+
+func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfiguration) error {
+ if err := jsonpb.Unmarshal(bytes.NewBuffer(content), config); err != nil {
+ return err
+ }
+ return nil
+}
+
+func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error {
+
+ m := jsonpb.Marshaler{
+ EmitDefaults: false,
+ Indent: " ",
+ }
+
+ return m.Marshal(writer, config)
+}
diff --git a/weed/s3iam/s3iam_filer_store_test.go b/weed/filer/s3iam_conf_test.go
index 6c595134e..65cc49840 100644
--- a/weed/s3iam/s3iam_filer_store_test.go
+++ b/weed/filer/s3iam_conf_test.go
@@ -1,24 +1,16 @@
-package s3iam
+package filer
import (
+ "bytes"
+ . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"testing"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/stretchr/testify/assert"
)
-const (
- ACTION_READ = "Read"
- ACTION_WRITE = "Write"
- ACTION_ADMIN = "Admin"
- ACTION_TAGGING = "Tagging"
- ACTION_LIST = "List"
-)
-
func TestS3Conf(t *testing.T) {
- ifs := &IAMFilerStore{}
s3Conf := &iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{
{
@@ -51,11 +43,11 @@ func TestS3Conf(t *testing.T) {
},
},
}
- entry := filer_pb.Entry{}
- err := ifs.saveIAMConfigToEntry(&entry, s3Conf)
+ var buf bytes.Buffer
+ err := S3ConfigurationToText(&buf, s3Conf)
assert.Equal(t, err, nil)
s3ConfSaved := &iam_pb.S3ApiConfiguration{}
- err = ifs.loadIAMConfigFromEntry(&entry, s3ConfSaved)
+ err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved)
assert.Equal(t, err, nil)
assert.Equal(t, "some_name", s3ConfSaved.Identities[0].Name)
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index 3f73d0eb6..d2acad4b2 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -42,7 +42,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
_, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
+ return fuse.EXDEV
}
return nil
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index abe77f063..c273eec8a 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -202,6 +202,10 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
}
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ if closer, ok := fh.f.reader.(io.Closer); ok {
+ closer.Close()
+ }
+ fh.f.reader = nil
}
return nil
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 96a716d5b..079fbd671 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "math"
"os"
"strings"
"time"
@@ -61,64 +60,88 @@ type EachEntryFunciton func(entry *Entry, isLast bool) error
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
- return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
+ var counter uint32
+ var startFrom string
+ var counterFunc = func(entry *Entry, isLast bool) error {
+ counter++
+ startFrom = entry.Name
+ return fn(entry, isLast)
+ }
-}
+ var paginationLimit uint32 = 10000
-func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+ if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
+ return err
+ }
- return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+ for counter == paginationLimit {
+ counter = 0
+ if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
+ return err
+ }
+ }
+ return nil
}
-func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+ return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+ })
+}
- err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+ return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
+ })
+}
- request := &ListEntriesRequest{
- Directory: string(fullDirPath),
- Prefix: prefix,
- StartFromFileName: startFrom,
- Limit: limit,
- InclusiveStartFrom: inclusive,
- }
+func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+ return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+}
- glog.V(4).Infof("read directory: %v", request)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
-
- var prevEntry *Entry
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- if prevEntry != nil {
- if err := fn(prevEntry, true); err != nil {
- return err
- }
+func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
+
+ request := &ListEntriesRequest{
+ Directory: string(fullDirPath),
+ Prefix: prefix,
+ StartFromFileName: startFrom,
+ Limit: limit,
+ InclusiveStartFrom: inclusive,
+ }
+
+ glog.V(4).Infof("read directory: %v", request)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.ListEntries(ctx, request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
+
+ var prevEntry *Entry
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ if prevEntry != nil {
+ if err := fn(prevEntry, true); err != nil {
+ return err
}
- break
- } else {
- return recvErr
}
+ break
+ } else {
+ return recvErr
}
- if prevEntry != nil {
- if err := fn(prevEntry, false); err != nil {
- return err
- }
+ }
+ if prevEntry != nil {
+ if err := fn(prevEntry, false); err != nil {
+ return err
}
- prevEntry = resp.Entry
}
+ prevEntry = resp.Entry
+ }
- return nil
-
- })
-
- return
+ return nil
}
func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index de1a0e3a1..da0a38dbf 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -1,33 +1,19 @@
package s3api
import (
- "bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"io/ioutil"
"net/http"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- "github.com/chrislusf/seaweedfs/weed/s3iam"
- "github.com/golang/protobuf/jsonpb"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
)
type Action string
-const (
- ACTION_READ = "Read"
- ACTION_WRITE = "Write"
- ACTION_ADMIN = "Admin"
- ACTION_TAGGING = "Tagging"
- ACTION_LIST = "List"
-)
-
type Iam interface {
Check(f http.HandlerFunc, actions ...Action) http.HandlerFunc
}
@@ -52,44 +38,40 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
iam := &IdentityAccessManagement{
domain: option.DomainName,
}
- if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil {
- glog.Warningf("fail to load config %v", err)
- }
- if len(iam.identities) == 0 && option.Config != "" {
+ if option.Config != "" {
if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
glog.Fatalf("fail to load config file %s: %v", option.Config, err)
}
+ } else {
+ if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil {
+ glog.Warningf("fail to load config: %v", err)
+ }
}
return iam
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error {
- s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- store := s3iam.NewIAMFilerStore(&client)
- if err := store.LoadIAMConfig(s3ApiConfiguration); err != nil {
- return nil
- }
- if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil {
- return err
- }
- return nil
- }, option.FilerGrpcAddress, option.GrpcDialOption)
+ content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile)
+ if err != nil {
+ return fmt.Errorf("read S3 config: %v", err)
+ }
+ return iam.loadS3ApiConfigurationFromBytes(content)
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error {
- s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
- rawData, readErr := ioutil.ReadFile(fileName)
+ content, readErr := ioutil.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
return fmt.Errorf("fail to read %s : %v", fileName, readErr)
}
+ return iam.loadS3ApiConfigurationFromBytes(content)
+}
- glog.V(1).Infof("load s3 config: %v", fileName)
- if err := jsonpb.Unmarshal(bytes.NewReader(rawData), s3ApiConfiguration); err != nil {
+func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error {
+ s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
+ if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil {
glog.Warningf("unmarshal error: %v", err)
- return fmt.Errorf("unmarshal %s error: %v", fileName, err)
+ return fmt.Errorf("unmarshal error: %v", err)
}
if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil {
return err
@@ -97,7 +79,9 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str
return nil
}
+
func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
+ var identities []*Identity
for _, ident := range config.Identities {
t := &Identity{
Name: ident.Name,
@@ -113,8 +97,11 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3Api
SecretKey: cred.SecretKey,
})
}
- iam.identities = append(iam.identities, t)
+ identities = append(identities, t)
}
+
+ // atomically switch
+ iam.identities = identities
return nil
}
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
new file mode 100644
index 000000000..ea4b69550
--- /dev/null
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -0,0 +1,70 @@
+package s3api
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "time"
+)
+
+func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+
+ message := resp.EventNotification
+ if message.NewEntry == nil {
+ return nil
+ }
+
+ dir := resp.Directory
+
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
+ if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil {
+ return err
+ }
+ glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile)
+ }
+
+ return nil
+ }
+
+ for {
+ err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: clientName,
+ PathPrefix: prefix,
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ glog.Fatalf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.Errorf("subscribing filer meta change: %v", err)
+ }
+ time.Sleep(time.Second)
+ }
+}
diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go
index c6f76560c..0383ddbcd 100644
--- a/weed/s3api/auth_credentials_test.go
+++ b/weed/s3api/auth_credentials_test.go
@@ -1,6 +1,7 @@
package s3api
import (
+ . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"testing"
"github.com/golang/protobuf/jsonpb"
diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go
new file mode 100644
index 000000000..f6056ef78
--- /dev/null
+++ b/weed/s3api/s3_constants/s3_actions.go
@@ -0,0 +1,10 @@
+package s3_constants
+
+const (
+ ACTION_READ = "Read"
+ ACTION_WRITE = "Write"
+ ACTION_ADMIN = "Admin"
+ ACTION_TAGGING = "Tagging"
+ ACTION_LIST = "List"
+)
+
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 5d63f1039..dce2fd6b0 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -4,6 +4,7 @@ import (
"context"
"encoding/xml"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"io"
"net/http"
"net/url"
@@ -197,11 +198,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
sepIndex := strings.Index(marker, "/")
subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
// println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
- subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
+ subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
if subErr != nil {
err = subErr
return
}
+ isTruncated = isTruncated || subIsTruncated
maxKeys -= subCounter
nextMarker = subDir + "/" + subNextMarker
counter += subCounter
@@ -245,8 +247,8 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
if entry.IsDirectory {
// println("ListEntries", dir, "dir:", entry.Name)
if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
- eachEntryFn(dir, entry)
if delimiter != "/" {
+ eachEntryFn(dir, entry)
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
if subErr != nil {
@@ -261,7 +263,14 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
} else {
- counter++
+ var isEmpty bool
+ if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
+ return
+ }
+ if !isEmpty {
+ eachEntryFn(dir, entry)
+ counter++
+ }
}
}
} else {
@@ -298,3 +307,49 @@ func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string,
}
return
}
+
+func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) {
+ // println("+ isDirectoryAllEmpty", dir, name)
+ var fileCounter int
+ var subDirs []string
+ currentDir := parentDir + "/" + name
+ var startFrom string
+ var isExhausted bool
+ for fileCounter == 0 && !isExhausted {
+ err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.IsDirectory {
+ subDirs = append(subDirs, entry.Name)
+ } else {
+ fileCounter++
+ }
+ startFrom = entry.Name
+ isExhausted = isExhausted || isLast
+ return nil
+ }, startFrom, false, 8)
+ }
+
+ if err != nil {
+ return false, err
+ }
+
+ if fileCounter > 0 {
+ return false, nil
+ }
+
+ for _, subDir := range subDirs {
+ isSubEmpty, subErr := s3a.isDirectoryAllEmpty(filerClient, currentDir, subDir)
+ if subErr != nil {
+ return false, subErr
+ }
+ if !isSubEmpty {
+ return false, nil
+ }
+ }
+
+ glog.V(1).Infof("deleting empty folder %s", currentDir)
+ if err = doDeleteEntry(filerClient, parentDir, name, true, true); err != nil {
+ return
+ }
+
+ return true, nil
+}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 850a02171..93e2bb575 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -2,8 +2,11 @@ package s3api
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"net/http"
"strings"
+ "time"
"github.com/gorilla/mux"
"google.golang.org/grpc"
@@ -32,6 +35,8 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.registerRouter(router)
+ go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano())
+
return s3ApiServer, nil
}
diff --git a/weed/s3iam/s3iam_filer_store.go b/weed/s3iam/s3iam_filer_store.go
deleted file mode 100644
index 4f84a0e54..000000000
--- a/weed/s3iam/s3iam_filer_store.go
+++ /dev/null
@@ -1,95 +0,0 @@
-package s3iam
-
-import (
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
- "time"
-
- proto "github.com/golang/protobuf/proto"
-)
-
-const (
- iamConfigPrefix = "/etc/iam"
- iamIdentityFile = "identity.json"
-)
-
-type IAMFilerStore struct {
- client *filer_pb.SeaweedFilerClient
-}
-
-func NewIAMFilerStore(client *filer_pb.SeaweedFilerClient) *IAMFilerStore {
- return &IAMFilerStore{client: client}
-}
-
-func (ifs *IAMFilerStore) getIAMConfigRequest() *filer_pb.LookupDirectoryEntryRequest {
- return &filer_pb.LookupDirectoryEntryRequest{
- Directory: iamConfigPrefix,
- Name: iamIdentityFile,
- }
-}
-
-func (ifs *IAMFilerStore) LoadIAMConfig(config *iam_pb.S3ApiConfiguration) error {
- resp, err := filer_pb.LookupEntry(*ifs.client, ifs.getIAMConfigRequest())
- if err != nil {
- return err
- }
- err = ifs.loadIAMConfigFromEntry(resp.Entry, config)
- if err != nil {
- return err
- }
- return nil
-}
-
-func (ifs *IAMFilerStore) SaveIAMConfig(config *iam_pb.S3ApiConfiguration) error {
- entry := &filer_pb.Entry{
- Name: iamIdentityFile,
- IsDirectory: false,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(0644),
- Collection: "",
- Replication: "",
- },
- Content: []byte{},
- }
- err := ifs.saveIAMConfigToEntry(entry, config)
- if err != nil {
- return err
- }
- _, err = filer_pb.LookupEntry(*ifs.client, ifs.getIAMConfigRequest())
- if err == filer_pb.ErrNotFound {
- err = filer_pb.CreateEntry(*ifs.client, &filer_pb.CreateEntryRequest{
- Directory: iamConfigPrefix,
- Entry: entry,
- IsFromOtherCluster: false,
- Signatures: nil,
- })
- } else {
- err = filer_pb.UpdateEntry(*ifs.client, &filer_pb.UpdateEntryRequest{
- Directory: iamConfigPrefix,
- Entry: entry,
- IsFromOtherCluster: false,
- Signatures: nil,
- })
- }
- if err != nil {
- return err
- }
- return nil
-}
-
-func (ifs *IAMFilerStore) loadIAMConfigFromEntry(entry *filer_pb.Entry, config *iam_pb.S3ApiConfiguration) error {
- if err := proto.Unmarshal(entry.Content, config); err != nil {
- return err
- }
- return nil
-}
-
-func (ifs *IAMFilerStore) saveIAMConfigToEntry(entry *filer_pb.Entry, config *iam_pb.S3ApiConfiguration) (err error) {
- entry.Content, err = proto.Marshal(config)
- if err != nil {
- return err
- }
- return nil
-}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 46e5c5957..5f1b2d819 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -170,6 +170,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
Extended: req.Entry.Extended,
HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
+ Content: req.Entry.Content,
}, req.OExcl, req.IsFromOtherCluster, req.Signatures)
if createErr == nil {
@@ -204,6 +205,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
Chunks: chunks,
HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
HardLinkCounter: req.Entry.HardLinkCounter,
+ Content: req.Entry.Content,
}
glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 391efb793..fa86737ac 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -15,13 +15,18 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
glog.V(1).Infof("AtomicRenameEntry %v", req)
+ oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
+ newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
+
+ if err := fs.filer.CanRename(oldParent, newParent); err != nil {
+ return nil, err
+ }
+
ctx, err := fs.filer.BeginTransaction(ctx)
if err != nil {
return nil, err
}
- oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
-
oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
if err != nil {
fs.filer.RollbackTransaction(ctx)
@@ -29,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
}
var events MoveEvents
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 634fb5211..3b8ced675 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -153,7 +153,14 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
}
if !strings.HasPrefix(fullpath, req.PathPrefix) {
- return nil
+ if eventNotification.NewParentPath != "" {
+ newFullPath := util.Join(eventNotification.NewParentPath, entryName)
+ if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ return nil
+ }
+ } else {
+ return nil
+ }
}
message := &filer_pb.SubscribeMetadataResponse{
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 461c08aad..d04053df5 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -55,6 +55,7 @@ type FilerOption struct {
Port uint32
recursiveDelete bool
Cipher bool
+ CacheToFilerLimit int64
Filers []string
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index f77b7f08d..d55bf7cbb 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -60,7 +60,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 {
+ if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent)
@@ -123,13 +123,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
filename := entry.Name()
adjustHeaderContentDisposition(w, r, filename)
+ totalSize := int64(entry.Size())
+
if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10))
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return
}
- totalSize := int64(entry.Size())
-
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
ext := filepath.Ext(filename)
width, height, mode, shouldResize := shouldResizeImages(ext, r)
@@ -148,7 +148,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if offset+size <= int64(len(entry.Content)) {
- _, err := writer.Write(entry.Content[offset:offset+size])
+ _, err := writer.Write(entry.Content[offset : offset+size])
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index c2d92f8ba..9131b042b 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -111,14 +111,7 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
// required by buckets folder
bucketDefaultReplication, fsync := "", false
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
- bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
- t := strings.Index(bucketAndObjectKey, "/")
- if t < 0 {
- collection = bucketAndObjectKey
- }
- if t > 0 {
- collection = bucketAndObjectKey[:t]
- }
+ collection = fs.filer.DetectBucket(util.FullPath(requestURI))
bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
}
if replication == "" {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 4d8a4d44c..0f6d0dc47 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -236,7 +236,8 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
break
}
}
- if chunkOffset < 2048 {
+
+ if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
smallContent = content
}
return fileChunks, md5Hash, chunkOffset, nil, smallContent
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 58a991876..9404081b4 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -55,7 +55,7 @@ type MasterServer struct {
vg *topology.VolumeGrowth
vgLock sync.Mutex
- bounedLeaderChan chan int
+ boundedLeaderChan chan int
// notifying clients
clientChansLock sync.RWMutex
@@ -96,7 +96,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
adminLocks: NewAdminLocks(),
}
- ms.bounedLeaderChan = make(chan int, 16)
+ ms.boundedLeaderChan = make(chan int, 16)
seq := ms.createSequencer(option)
if nil == seq {
@@ -157,8 +157,8 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
if ms.Topo.IsLeader() {
f(w, r)
} else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- ms.bounedLeaderChan <- 1
- defer func() { <-ms.bounedLeaderChan }()
+ ms.boundedLeaderChan <- 1
+ defer func() { <-ms.boundedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index d530b2585..497ef4f9e 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -5,14 +5,11 @@ import (
"flag"
"fmt"
"io"
- "math"
- "net/http"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -65,25 +62,16 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
var buf bytes.Buffer
if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: filer.DirectoryEtc,
- Name: filer.FilerConfName,
- }
- respLookupEntry, err := filer_pb.LookupEntry(client, request)
- if err != nil {
- return err
- }
-
- return filer.StreamContent(commandEnv.MasterClient, &buf, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
-
- }); err != nil {
+ return filer.ReadEntry(commandEnv.MasterClient, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, &buf)
+ }); err != nil && err != filer_pb.ErrNotFound {
return err
}
fc := filer.NewFilerConf()
- if err = fc.LoadFromBytes(buf.Bytes()); err != nil {
- return err
+ if buf.Len() > 0 {
+ if err = fc.LoadFromBytes(buf.Bytes()); err != nil {
+ return err
+ }
}
if *locationPrefix != "" {
@@ -128,21 +116,9 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- target := fmt.Sprintf("http://%s:%d%s/%s", commandEnv.option.FilerHost, commandEnv.option.FilerPort, filer.DirectoryEtc, filer.FilerConfName)
-
- // set the HTTP method, url, and request body
- req, err := http.NewRequest(http.MethodPut, target, &buf)
- if err != nil {
- return err
- }
-
- // set the request header Content-Type for json
- req.Header.Set("Content-Type", "text/plain; charset=utf-8")
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
+ if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil {
return err
}
- util.CloseResponse(resp)
}
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index c1ac1ce74..869949a25 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -1,15 +1,16 @@
package shell
import (
+ "bytes"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"io"
"sort"
"strings"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
- "github.com/chrislusf/seaweedfs/weed/s3iam"
)
func init() {
@@ -24,15 +25,17 @@ func (c *commandS3Configure) Name() string {
}
func (c *commandS3Configure) Help() string {
- return `configure and apply s3 options for each bucket
+ return `<WIP> configure and apply s3 options for each bucket
+
# see the current configuration file content
s3.configure
`
}
func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
s3ConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- actions := s3ConfigureCommand.String("actions", "", "actions names")
+ actions := s3ConfigureCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin")
user := s3ConfigureCommand.String("user", "", "user name")
buckets := s3ConfigureCommand.String("buckets", "", "bucket name")
accessKey := s3ConfigureCommand.String("access_key", "", "specify the access key")
@@ -44,18 +47,20 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
- s3cfg := &iam_pb.S3ApiConfiguration{}
- ifs := &s3iam.IAMFilerStore{}
+ var buf bytes.Buffer
if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- ifs = s3iam.NewIAMFilerStore(&client)
- if err := ifs.LoadIAMConfig(s3cfg); err != nil {
- return nil
- }
- return nil
- }); err != nil {
+ return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf)
+ }); err != nil && err != filer_pb.ErrNotFound {
return err
}
+ s3cfg := &iam_pb.S3ApiConfiguration{}
+ if buf.Len() > 0 {
+ if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg); err != nil {
+ return err
+ }
+ }
+
idx := 0
changed := false
if *user != "" {
@@ -158,16 +163,20 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
s3cfg.Identities = append(s3cfg.Identities, &identity)
}
- for _, identity := range s3cfg.Identities {
- fmt.Fprintf(writer, fmt.Sprintf("%+v\n", identity))
- }
+ buf.Reset()
+ filer.S3ConfigurationToText(&buf, s3cfg)
+ fmt.Fprintf(writer, string(buf.Bytes()))
fmt.Fprintln(writer)
if *apply {
- if err := ifs.SaveIAMConfig(s3cfg); err != nil {
+
+ if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
+ }); err != nil {
return err
}
+
}
return nil
diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go
index 26ce3b8fd..d0de3768e 100644
--- a/weed/storage/needle/volume_ttl.go
+++ b/weed/storage/needle/volume_ttl.go
@@ -134,7 +134,7 @@ func (t TTL) Minutes() uint32 {
case Week:
return uint32(t.Count) * 60 * 24 * 7
case Month:
- return uint32(t.Count) * 60 * 24 * 31
+ return uint32(t.Count) * 60 * 24 * 30
case Year:
return uint32(t.Count) * 60 * 24 * 365
}
@@ -145,5 +145,38 @@ func SecondsToTTL(seconds int32) string {
if seconds == 0 {
return ""
}
- return fmt.Sprintf("%dm", seconds/60)
+ if seconds%(3600*24*365) == 0 && seconds/(3600*24*365) < 256 {
+ return fmt.Sprintf("%dy", seconds/(3600*24*365))
+ }
+ if seconds%(3600*24*30) == 0 && seconds/(3600*24*30) < 256 {
+ return fmt.Sprintf("%dM", seconds/(3600*24*30))
+ }
+ if seconds%(3600*24*7) == 0 && seconds/(3600*24*7) < 256 {
+ return fmt.Sprintf("%dw", seconds/(3600*24*7))
+ }
+ if seconds%(3600*24) == 0 && seconds/(3600*24) < 256 {
+ return fmt.Sprintf("%dd", seconds/(3600*24))
+ }
+ if seconds%(3600) == 0 && seconds/(3600) < 256 {
+ return fmt.Sprintf("%dh", seconds/(3600))
+ }
+ if seconds/60 < 256 {
+ return fmt.Sprintf("%dm", seconds/60)
+ }
+ if seconds/(3600) < 256 {
+ return fmt.Sprintf("%dh", seconds/(3600))
+ }
+ if seconds/(3600*24) < 256 {
+ return fmt.Sprintf("%dd", seconds/(3600*24))
+ }
+ if seconds/(3600*24*7) < 256 {
+ return fmt.Sprintf("%dw", seconds/(3600*24*7))
+ }
+ if seconds/(3600*24*30) < 256 {
+ return fmt.Sprintf("%dM", seconds/(3600*24*30))
+ }
+ if seconds/(3600*24*365) < 256 {
+ return fmt.Sprintf("%dy", seconds/(3600*24*365))
+ }
+ return ""
}
diff --git a/weed/storage/needle/volume_ttl_test.go b/weed/storage/needle/volume_ttl_test.go
index f75453593..150d06e6e 100644
--- a/weed/storage/needle/volume_ttl_test.go
+++ b/weed/storage/needle/volume_ttl_test.go
@@ -41,7 +41,7 @@ func TestTTLReadWrite(t *testing.T) {
}
ttl, _ = ReadTTL("5M")
- if ttl.Minutes() != 5*31*24*60 {
+ if ttl.Minutes() != 5*30*24*60 {
t.Errorf("5M ttl:%v", ttl)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index e64dd8fab..83c40a01a 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -225,8 +225,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
- } else {
- glog.Warningf("volume %d has IO error", v.Id)
+ glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index c726e7f11..7712c5eda 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -205,9 +205,9 @@ func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("volume %d now:%v lastModified:%v", v.Id, time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("volume %d ttl:%v lived:%v", v.Id, v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index fe4980e31..34eee876d 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -25,6 +25,20 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
alreadyHasSuperBlock := false
+ hasLoadedVolume := false
+ defer func() {
+ if !hasLoadedVolume {
+ if v.nm != nil {
+ v.nm.Close()
+ v.nm = nil
+ }
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ v.DataBackend = nil
+ }
+ }
+ }()
+
hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
if v.HasRemoteFile() {
@@ -151,5 +165,9 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
+ if err == nil {
+ hasLoadedVolume = true
+ }
+
return err
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index c30abf237..f28ee50e6 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -322,7 +322,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if !n.HasLastModifiedDate() {
return bytesRead, nil
}
- if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
return bytesRead, nil
}
return -1, ErrorNotFound
diff --git a/weed/util/config.go b/weed/util/config.go
index 6acf21c12..47f433028 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -22,6 +22,7 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
viper.SetConfigName(configFileName) // name of config file (without extension)
viper.AddConfigPath(".") // optionally look for config in the working directory
viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 254f3cb59..52ba08494 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 13)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 15)
COMMIT = ""
)
diff --git a/weed/util/network.go b/weed/util/network.go
index 7108cfea6..55a123667 100644
--- a/weed/util/network.go
+++ b/weed/util/network.go
@@ -7,16 +7,26 @@ import (
)
func DetectedHostAddress() string {
- addrs, err := net.InterfaceAddrs()
+ netInterfaces, err := net.Interfaces()
if err != nil {
- glog.V(0).Infof("failed to detect ip address: %v", err)
+ glog.V(0).Infof("failed to detect net interfaces: %v", err)
return ""
}
- for _, a := range addrs {
- if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
+ for _, netInterface := range netInterfaces {
+ if (netInterface.Flags & net.FlagUp) == 0 {
+ continue
+ }
+ addrs, err := netInterface.Addrs()
+ if err != nil {
+ glog.V(0).Infof("get interface addresses: %v", err)
+ }
+
+ for _, a := range addrs {
+ if ipNet, ok := a.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
+ if ipNet.IP.To4() != nil {
+ return ipNet.IP.String()
+ }
}
}
}