diff options
| -rw-r--r-- | README.md | 2 | ||||
| -rw-r--r-- | weed/command/server.go | 12 | ||||
| -rw-r--r-- | weed/pb/grpc_client_server.go | 1 | ||||
| -rw-r--r-- | weed/shell/command_volume_mark.go | 55 | ||||
| -rw-r--r-- | weed/shell/command_volume_move.go | 15 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 3 | ||||
| -rw-r--r-- | weed/storage/store.go | 30 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 65 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 2 |
9 files changed, 159 insertions, 26 deletions
@@ -101,7 +101,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a * Automatic compaction to reclaim disk space after deletion or update. * [Automatic entry TTL expiration][VolumeServerTTL]. * Any server with some disk spaces can add to the total storage space. -* Adding/Removing servers does **not** cause any data re-balancing. +* Adding/Removing servers does **not** cause any data re-balancing unless triggered by admin commands. * Optional picture resizing. * Support ETag, Accept-Range, Last-Modified, etc. * Support in-memory/leveldb/readonly mode tuning for memory/performance balance. diff --git a/weed/command/server.go b/weed/command/server.go index 6a78fb3f4..d06b1d40a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,13 +2,14 @@ package command import ( "fmt" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "os" "runtime" "runtime/pprof" "strings" "time" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -60,9 +61,10 @@ var ( serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") - isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") - isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") + isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server") + isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") + isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") + isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string @@ -214,7 +216,7 @@ func runServer(cmd *Command, args []string) bool { } // start volume server - { + if *isStartingVolumeServer { go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent) } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index ce706e282..f19af43b2 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -62,6 +62,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(Max_Message_Size), grpc.MaxCallRecvMsgSize(Max_Message_Size), + grpc.WaitForReady(true), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, // client ping server if no activity for this long diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go new file mode 100644 index 000000000..19b614310 --- /dev/null +++ b/weed/shell/command_volume_mark.go @@ -0,0 +1,55 @@ +package shell + +import ( + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeMark{}) +} + +type commandVolumeMark struct { +} + +func (c *commandVolumeMark) Name() string { + return "volume.mark" +} + +func (c *commandVolumeMark) Help() string { + return `Mark volume writable or readonly from one volume server + + volume.mark -node <volume server host:port> -volumeId <volume id> -writable or -readonly +` +} + +func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volMarkCommand.Int("volumeId", 0, "the volume id") + nodeStr := volMarkCommand.String("node", "", "the volume server <host>:<port>") + writable := volMarkCommand.Bool("writable", false, "volume mark writable") + readonly := volMarkCommand.Bool("readonly", false, "volume mark readonly") + if err = volMarkCommand.Parse(args); err != nil { + return nil + } + markWritable := false + if (*writable && *readonly) || (!*writable && !*readonly) { + return fmt.Errorf("use -readonly or -writable") + } else if *writable { + markWritable = true + } + + sourceVolumeServer := *nodeStr + + volumeId := needle.VolumeId(*volumeIdInt) + + return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable) +} diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index b136604e5..2bc8dfad8 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -166,3 +166,18 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour return deleteErr }) } + +func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if writable { + _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ + VolumeId: uint32(volumeId), + }) + } else { + _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + } + return err + }) +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 5dec21c32..1f58c9977 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -61,6 +61,7 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { name := fileInfo.Name() if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { + name := name[:len(name)-len(".idx")] noteFile := l.Directory + "/" + name + ".note" if util.FileExists(noteFile) { note, _ := ioutil.ReadFile(noteFile) @@ -93,7 +94,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne size, _, _ := v.FileStat() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) return true } return false diff --git a/weed/storage/store.go b/weed/storage/store.go index b9fcfcba9..3a8c07e6e 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -200,7 +200,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { maxVolumeCount := 0 var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) - collectionVolumeReadOnlyCount := make(map[string]uint8) + collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId maxVolumeCount = maxVolumeCount + location.MaxVolumeCount @@ -220,11 +220,25 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } } collectionVolumeSize[v.Collection] += volumeMessage.Size + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { + collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ + "IsReadOnly": 0, + "noWriteOrDelete": 0, + "noWriteCanDelete": 0, + "isDiskSpaceLow": 0, + } + } if v.IsReadOnly() { - collectionVolumeReadOnlyCount[v.Collection] += 1 - } else { - if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { - collectionVolumeReadOnlyCount[v.Collection] = 0 + collectionVolumeReadOnlyCount[v.Collection] = make(map[string]uint8) + collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 + if v.noWriteOrDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 + } + if v.noWriteCanDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 + } + if v.location.isDiskSpaceLow { + collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 } } } @@ -251,8 +265,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } - for col, count := range collectionVolumeReadOnlyCount { - stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count)) + for col, types := range collectionVolumeReadOnlyCount { + for t, count := range types { + stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count)) + } } return &master_pb.Heartbeat{ diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index e42fb238b..78c693f5c 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -11,17 +12,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { +func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 - if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil { + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err) } if indexSize == 0 { return 0, nil } + for i := 1; i <= 10; i++ { + // check and fix last 10 entries + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + if err != ErrorSizeMismatch { + break + } + } + return +} + +func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) { var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil { + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err) } key, offset, size := idx.IdxFileEntry(lastIdxEntry) if offset.IsZero() { @@ -29,15 +41,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin } if size < 0 { // read the deletion entry - if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + return lastAppendAtNs, err } } - return + return lastAppendAtNs, nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -60,7 +72,38 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err } func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { - n := new(needle.Needle) + n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) + if err != nil { + return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) + } + if n.Size != size { + return 0, ErrorSizeMismatch + } + if v == needle.Version3 { + bytes := make([]byte, TimestampSize) + _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err != nil { + return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) + } + n.AppendAtNs = util.BytesToUint64(bytes) + fileTailOffset := offset + needle.GetActualSize(size, v) + fileSize, _, err := datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err) + } + if fileSize == fileTailOffset { + return n.AppendAtNs, nil + } + if fileSize > fileTailOffset { + glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset) + err = datFile.Truncate(fileTailOffset) + if err == nil { + return n.AppendAtNs, nil + } + return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err) + } + glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset) + } if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 73e2de02b..05684cbdb 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) } } - if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { + if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { v.noWriteOrDelete = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) } |
