diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-20 11:03:58 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-20 11:03:58 -0700 |
| commit | aba212deddf381407596ff98e7713f1b41b9e207 (patch) | |
| tree | cf04cd725d6037fca5dac178cd25843aadb3659a /weed | |
| parent | d218fe54faa9e8603024c6957685d6c60941ef09 (diff) | |
| parent | 2f474c33d0f650d43ecced7017388b6b567a400b (diff) | |
| download | seaweedfs-aba212deddf381407596ff98e7713f1b41b9e207.tar.xz seaweedfs-aba212deddf381407596ff98e7713f1b41b9e207.zip | |
Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/fix.go | 58 | ||||
| -rw-r--r-- | weed/command/scaffold/filer.toml | 7 | ||||
| -rw-r--r-- | weed/filer/mongodb/mongodb_store.go | 72 | ||||
| -rw-r--r-- | weed/iamapi/iamapi_management_handlers.go | 5 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 3 | ||||
| -rw-r--r-- | weed/s3api/auth_credentials.go | 4 | ||||
| -rw-r--r-- | weed/s3api/auth_credentials_test.go | 15 | ||||
| -rw-r--r-- | weed/s3api/s3_constants/s3_actions.go | 15 | ||||
| -rw-r--r-- | weed/s3api/s3_constants/s3_config.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 12 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 2 | ||||
| -rw-r--r-- | weed/stats/disk_notsupported.go | 4 | ||||
| -rw-r--r-- | weed/stats/disk_openbsd.go | 25 | ||||
| -rw-r--r-- | weed/storage/needle_map_memory.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 7 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 26 |
16 files changed, 202 insertions, 59 deletions
diff --git a/weed/command/fix.go b/weed/command/fix.go index b226a0b1a..4fb4ed88e 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -32,12 +32,15 @@ var cmdFix = &Command{ var ( fixVolumeCollection = cmdFix.Flag.String("collection", "", "an optional volume collection name, if specified only it will be processed") fixVolumeId = cmdFix.Flag.Int64("volumeId", 0, "an optional volume id, if not 0 (default) only it will be processed") + fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", true, "include deleted entries in the index file") fixIgnoreError = cmdFix.Flag.Bool("ignoreError", false, "an optional, if true will be processed despite errors") ) type VolumeFileScanner4Fix struct { - version needle.Version - nm *needle_map.MemDb + version needle.Version + nm *needle_map.MemDb + nmDeleted *needle_map.MemDb + includeDeleted bool } func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -50,13 +53,20 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { } func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { - glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed()) + glog.V(2).Infof("key %v offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed()) if n.Size.IsValid() { - pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) - glog.V(2).Infof("saved %d with error %v", n.Size, pe) + if pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size); pe != nil { + return fmt.Errorf("saved %d with error %v", n.Size, pe) + } } else { - glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id) + if scanner.includeDeleted { + if pe := scanner.nmDeleted.Set(n.Id, types.ToOffset(offset), types.TombstoneFileSize); pe != nil { + return fmt.Errorf("saved deleted %d with error %v", n.Size, pe) + } + } else { + glog.V(2).Infof("skipping deleted file ...") + return scanner.nm.Delete(n.Id) + } } return nil } @@ -109,21 +119,45 @@ func runFix(cmd *Command, args []string) bool { if *fixVolumeId != 0 && *fixVolumeId != volumeId { continue } - doFixOneVolume(basePath, baseFileName, collection, volumeId) + doFixOneVolume(basePath, baseFileName, collection, volumeId, *fixIncludeDeleted) } } return true } -func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64) { +func SaveToIdx(scaner *VolumeFileScanner4Fix, idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return + } + defer func() { + idxFile.Close() + }() + + return scaner.nm.AscendingVisit(func(value needle_map.NeedleValue) error { + _, err := idxFile.Write(value.ToBytes()) + if scaner.includeDeleted && err == nil { + if deleted, ok := scaner.nmDeleted.Get(value.Key); ok { + _, err = idxFile.Write(deleted.ToBytes()) + } + } + return err + }) +} + +func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64, fixIncludeDeleted bool) { indexFileName := path.Join(basepath, baseFileName+".idx") nm := needle_map.NewMemDb() + nmDeleted := needle_map.NewMemDb() defer nm.Close() + defer nmDeleted.Close() vid := needle.VolumeId(volumeId) scanner := &VolumeFileScanner4Fix{ - nm: nm, + nm: nm, + nmDeleted: nmDeleted, + includeDeleted: fixIncludeDeleted, } if err := storage.ScanVolumeFile(basepath, collection, vid, storage.NeedleMapInMemory, scanner); err != nil { @@ -135,12 +169,12 @@ func doFixOneVolume(basepath string, baseFileName string, collection string, vol } } - if err := nm.SaveToIdx(indexFileName); err != nil { - os.Remove(indexFileName) + if err := SaveToIdx(scanner, indexFileName); err != nil { err := fmt.Errorf("save to .idx File: %v", err) if *fixIgnoreError { glog.Error(err) } else { + os.Remove(indexFileName) glog.Fatal(err) } } diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 30a9cae51..574125207 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -280,6 +280,13 @@ tls_client_key_file="" [mongodb] enabled = false uri = "mongodb://localhost:27017" +username = "" +password = "" +ssl = false +ssl_ca_file = "" +ssl_cert_file = "" +ssl_key_file = " +insecure_skip_verify = false option_pool_size = 0 database = "seaweedfs" diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 8ebbc6ab0..fbaa464b9 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -2,7 +2,12 @@ package mongodb import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "os" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -10,7 +15,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "time" ) func init() { @@ -37,17 +41,44 @@ func (store *MongodbStore) Initialize(configuration util.Configuration, prefix s store.database = configuration.GetString(prefix + "database") store.collectionName = "filemeta" poolSize := configuration.GetInt(prefix + "option_pool_size") - return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize)) + uri := configuration.GetString(prefix + "uri") + ssl := configuration.GetBool(prefix + "ssl") + sslCAFile := configuration.GetString(prefix + "ssl_ca_file") + sslCertFile := configuration.GetString(prefix + "ssl_cert_file") + sslKeyFile := configuration.GetString(prefix + "ssl_key_file") + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + insecure_skip_verify := configuration.GetBool(prefix + "insecure_skip_verify") + + return store.connection(uri, uint64(poolSize), ssl, sslCAFile, sslCertFile, sslKeyFile, username, password, insecure_skip_verify) } -func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) +func (store *MongodbStore) connection(uri string, poolSize uint64, ssl bool, sslCAFile, sslCertFile, sslKeyFile string, username, password string, insecure bool) (err error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + opts := options.Client().ApplyURI(uri) if poolSize > 0 { opts.SetMaxPoolSize(poolSize) } + if ssl { + tlsConfig, err := configureTLS(sslCAFile, sslCertFile, sslKeyFile, insecure) + if err != nil { + return err + } + opts.SetTLSConfig(tlsConfig) + } + + if username != "" && password != "" { + creds := options.Credential{ + Username: username, + Password: password, + } + opts.SetAuth(creds) + } + client, err := mongo.Connect(ctx, opts) if err != nil { return err @@ -55,10 +86,36 @@ func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { c := client.Database(store.database).Collection(store.collectionName) err = store.indexUnique(c) + store.connect = client return err } +func configureTLS(caFile, certFile, keyFile string, insecure bool) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("could not load client key pair: %s", err) + } + + caCert, err := os.ReadFile(caFile) + if err != nil { + return nil, fmt.Errorf("could not read CA certificate: %s", err) + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append CA certificate") + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: insecure, + } + + return tlsConfig, nil +} + func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error { _, err := c.Indexes().CreateOne(context.Background(), index, opts) return err @@ -93,13 +150,10 @@ func (store *MongodbStore) RollbackTransaction(ctx context.Context) error { } func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.UpdateEntry(ctx, entry) - } func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { @@ -126,7 +180,6 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) } func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { - dir, name := fullpath.DirAndName() var data Model @@ -154,7 +207,6 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath } func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - dir, name := fullpath.DirAndName() where := bson.M{"directory": dir, "name": name} @@ -167,7 +219,6 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa } func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - where := bson.M{"directory": fullpath} _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) if err != nil { @@ -182,7 +233,6 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir } func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}} if includeStartFile { where["name"] = bson.M{ diff --git a/weed/iamapi/iamapi_management_handlers.go b/weed/iamapi/iamapi_management_handlers.go index d63bc8849..6b0f9bbfc 100644 --- a/weed/iamapi/iamapi_management_handlers.go +++ b/weed/iamapi/iamapi_management_handlers.go @@ -33,6 +33,7 @@ const ( StatementActionReadAcp = "GetBucketAcl" StatementActionList = "List*" StatementActionTagging = "Tagging*" + StatementActionDelete = "DeleteBucket*" ) var ( @@ -58,6 +59,8 @@ func MapToStatementAction(action string) string { return s3_constants.ACTION_LIST case StatementActionTagging: return s3_constants.ACTION_TAGGING + case StatementActionDelete: + return s3_constants.ACTION_DELETE_BUCKET default: return "" } @@ -79,6 +82,8 @@ func MapToIdentitiesAction(action string) string { return StatementActionList case s3_constants.ACTION_TAGGING: return StatementActionTagging + case s3_constants.ACTION_DELETE_BUCKET: + return StatementActionDelete default: return "" } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 81acd9a2d..6e7549cdc 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "strconv" "strings" + "encoding/base64" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -202,7 +203,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures Tagging: aws.String(tags), } if len(entry.Attributes.Md5) > 0 { - uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5)) + uploadInput.ContentMD5 = aws.String(base64.StdEncoding.EncodeToString([]byte(entry.Attributes.Md5))) } _, err = uploader.Upload(&uploadInput) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index a2b1fd90f..6121aecba 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -317,6 +317,7 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt } identity, errCode := iam.authRequest(r, action) + glog.V(3).Infof("auth error: %v", errCode) if errCode == s3err.ErrNone { if identity != nil && identity.Name != "" { r.Header.Set(s3_constants.AmzIdentityId, identity.Name) @@ -453,6 +454,7 @@ func (identity *Identity) canDo(action Action, bucket string, objectKey string) } } if bucket == "" { + glog.V(3).Infof("identity %s is not allowed to perform action %s on %s -- bucket is empty", identity.Name, action, bucket+objectKey) return false } target := string(action) + ":" + bucket + objectKey @@ -477,6 +479,8 @@ func (identity *Identity) canDo(action Action, bucket string, objectKey string) } } } + //log error + glog.V(3).Infof("identity %s is not allowed to perform action %s on %s", identity.Name, action, bucket+objectKey) return false } diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go index f9f87fc54..1d9f1a95f 100644 --- a/weed/s3api/auth_credentials_test.go +++ b/weed/s3api/auth_credentials_test.go @@ -1,11 +1,12 @@ package s3api import ( - . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/stretchr/testify/assert" "reflect" "testing" + . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" jsonpb "google.golang.org/protobuf/encoding/protojson" ) @@ -79,6 +80,7 @@ func TestCanDo(t *testing.T) { } // object specific assert.Equal(t, true, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, false, ident1.canDo(ACTION_DELETE_BUCKET, "bucket1", "")) assert.Equal(t, false, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/other/some"), "action without *") // bucket specific @@ -141,6 +143,15 @@ func TestCanDo(t *testing.T) { }, } assert.Equal(t, true, ident6.canDo(ACTION_READ, "anything_bucket", "/a/b/c/d.txt")) + + //test deleteBucket operation + ident7 := &Identity{ + Name: "anything", + Actions: []Action{ + "DeleteBucket:bucket1", + }, + } + assert.Equal(t, true, ident7.canDo(ACTION_DELETE_BUCKET, "bucket1", "")) } type LoadS3ApiConfigurationTestCase struct { diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go index 8d770e408..864979784 100644 --- a/weed/s3api/s3_constants/s3_actions.go +++ b/weed/s3api/s3_constants/s3_actions.go @@ -1,13 +1,14 @@ package s3_constants const ( - ACTION_READ = "Read" - ACTION_READ_ACP = "ReadAcp" - ACTION_WRITE = "Write" - ACTION_WRITE_ACP = "WriteAcp" - ACTION_ADMIN = "Admin" - ACTION_TAGGING = "Tagging" - ACTION_LIST = "List" + ACTION_READ = "Read" + ACTION_READ_ACP = "ReadAcp" + ACTION_WRITE = "Write" + ACTION_WRITE_ACP = "WriteAcp" + ACTION_ADMIN = "Admin" + ACTION_TAGGING = "Tagging" + ACTION_LIST = "List" + ACTION_DELETE_BUCKET = "DeleteBucket" SeaweedStorageDestinationHeader = "x-seaweedfs-destination" MultipartUploadsFolder = ".uploads" diff --git a/weed/s3api/s3_constants/s3_config.go b/weed/s3api/s3_constants/s3_config.go index cb44b9484..d2d2c257a 100644 --- a/weed/s3api/s3_constants/s3_config.go +++ b/weed/s3api/s3_constants/s3_config.go @@ -7,7 +7,7 @@ import ( var ( CircuitBreakerConfigDir = "/etc/s3" CircuitBreakerConfigFile = "circuit_breaker.json" - AllowedActions = []string{ACTION_READ, ACTION_READ_ACP, ACTION_WRITE, ACTION_WRITE_ACP, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN} + AllowedActions = []string{ACTION_READ, ACTION_READ_ACP, ACTION_WRITE, ACTION_WRITE_ACP, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN, ACTION_DELETE_BUCKET} LimitTypeCount = "Count" LimitTypeBytes = "MB" Separator = ":" diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 151bdaca5..12d2c0432 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -6,14 +6,15 @@ import ( "encoding/xml" "errors" "fmt" - "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "net/http" "strings" "time" + "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -218,6 +219,10 @@ func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorC return s3err.ErrNoSuchBucket } + //if iam is enabled, the access was already checked before + if s3a.iam.isEnabled() { + return s3err.ErrNone + } if !s3a.hasAccess(r, entry) { return s3err.ErrAccessDenied } @@ -236,6 +241,7 @@ func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool { identityId := r.Header.Get(s3_constants.AmzIdentityId) if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok { if identityId != string(id) { + glog.V(3).Infof("hasAccess: %s != %s (entry.Extended = %v)", identityId, id, entry.Extended) return false } } diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 1477d650f..9422318ce 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -279,7 +279,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketHandler, ACTION_ADMIN)), "PUT")) // DeleteBucket - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_ADMIN)), "DELETE")) + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_DELETE_BUCKET)), "DELETE")) // ListObjectsV1 (Legacy) bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST")) diff --git a/weed/stats/disk_notsupported.go b/weed/stats/disk_notsupported.go index 1da714c73..418164546 100644 --- a/weed/stats/disk_notsupported.go +++ b/weed/stats/disk_notsupported.go @@ -1,5 +1,5 @@ -//go:build openbsd || netbsd || plan9 || solaris -// +build openbsd netbsd plan9 solaris +//go:build netbsd || plan9 || solaris +// +build netbsd plan9 solaris package stats diff --git a/weed/stats/disk_openbsd.go b/weed/stats/disk_openbsd.go new file mode 100644 index 000000000..8224e626e --- /dev/null +++ b/weed/stats/disk_openbsd.go @@ -0,0 +1,25 @@ +//go:build openbsd +// +build openbsd + +package stats + +import ( + "syscall" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +func fillInDiskStatus(disk *volume_server_pb.DiskStatus) { + fs := syscall.Statfs_t{} + err := syscall.Statfs(disk.Dir, &fs) + if err != nil { + return + } + disk.All = fs.F_blocks * uint64(fs.F_bsize) + disk.Free = fs.F_bfree * uint64(fs.F_bsize) + disk.Used = disk.All - disk.Free + disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100) + disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100) + return +} + diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index a2beb6c33..c75514a31 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -36,8 +36,8 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error { nm.MaybeSetMaxFileKey(key) - nm.FileCounter++ if !offset.IsZero() && size.IsValid() { + nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) if !oldOffset.IsZero() && oldSize.IsValid() { @@ -51,7 +51,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { } return nil }) - glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name()) + glog.V(1).Infof("max file key: %v count: %d deleted: %d for file: %s", nm.MaxFileKey(), nm.FileCount(), nm.DeletedCount(), file.Name()) return nm, e } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 0c5f154e8..f5ceffcce 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -109,9 +109,6 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) } n.AppendAtNs = util.BytesToUint64(bytes) - if n.HasTtl() { - return n.AppendAtNs, nil - } fileTailOffset := offset + needle.GetActualSize(size, v) fileSize, _, err := datFile.GetStat() if err != nil { @@ -130,7 +127,7 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) } if n.Id != key { - return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) + return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id) } return n.AppendAtNs, err } @@ -147,7 +144,7 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err) } if n.Id != key { - return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) + return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id) } return n.AppendAtNs, err } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index c8098493d..6bbbde71d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -487,19 +487,21 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da if err != nil { return err } - dstDatSize, _, err := dstDatBackend.GetStat() - if err != nil { - return err - } - if v.nm.ContentSize() > v.nm.DeletedSize() { - expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize() - if expectedContentSize > uint64(dstDatSize) { - return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d", - v.Id.String(), dstDatSize, expectedContentSize) + if v.Ttl.String() == "" { + dstDatSize, _, err := dstDatBackend.GetStat() + if err != nil { + return err + } + if v.nm.ContentSize() > v.nm.DeletedSize() { + expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize() + if expectedContentSize > uint64(dstDatSize) { + return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d", + v.Id.String(), dstDatSize, expectedContentSize) + } + } else { + glog.Warningf("volume %s content size: %d less deleted size: %d, new size: %d", + v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize) } - } else { - glog.Warningf("volume %s content size: %d less deleted size: %d, new size: %d", - v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize) } err = newNm.SaveToIdx(datIdxName) if err != nil { |
