aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md2
-rw-r--r--weed/command/server.go12
-rw-r--r--weed/pb/grpc_client_server.go1
-rw-r--r--weed/shell/command_volume_mark.go55
-rw-r--r--weed/shell/command_volume_move.go15
-rw-r--r--weed/storage/disk_location.go3
-rw-r--r--weed/storage/store.go30
-rw-r--r--weed/storage/volume_checking.go65
-rw-r--r--weed/storage/volume_loading.go2
9 files changed, 159 insertions, 26 deletions
diff --git a/README.md b/README.md
index 4d5a8a80d..ddf41b396 100644
--- a/README.md
+++ b/README.md
@@ -101,7 +101,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a
* Automatic compaction to reclaim disk space after deletion or update.
* [Automatic entry TTL expiration][VolumeServerTTL].
* Any server with some disk spaces can add to the total storage space.
-* Adding/Removing servers does **not** cause any data re-balancing.
+* Adding/Removing servers does **not** cause any data re-balancing unless triggered by admin commands.
* Optional picture resizing.
* Support ETag, Accept-Range, Last-Modified, etc.
* Support in-memory/leveldb/readonly mode tuning for memory/performance balance.
diff --git a/weed/command/server.go b/weed/command/server.go
index 6a78fb3f4..d06b1d40a 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,13 +2,14 @@ package command
import (
"fmt"
- stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -60,9 +61,10 @@ var (
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
- isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
- isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
+ isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server")
+ isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
+ isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
@@ -214,7 +216,7 @@ func runServer(cmd *Command, args []string) bool {
}
// start volume server
- {
+ if *isStartingVolumeServer {
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent)
}
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index ce706e282..f19af43b2 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -62,6 +62,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(Max_Message_Size),
grpc.MaxCallRecvMsgSize(Max_Message_Size),
+ grpc.WaitForReady(true),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long
diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go
new file mode 100644
index 000000000..19b614310
--- /dev/null
+++ b/weed/shell/command_volume_mark.go
@@ -0,0 +1,55 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeMark{})
+}
+
+type commandVolumeMark struct {
+}
+
+func (c *commandVolumeMark) Name() string {
+ return "volume.mark"
+}
+
+func (c *commandVolumeMark) Help() string {
+ return `Mark volume writable or readonly from one volume server
+
+ volume.mark -node <volume server host:port> -volumeId <volume id> -writable or -readonly
+`
+}
+
+func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeIdInt := volMarkCommand.Int("volumeId", 0, "the volume id")
+ nodeStr := volMarkCommand.String("node", "", "the volume server <host>:<port>")
+ writable := volMarkCommand.Bool("writable", false, "volume mark writable")
+ readonly := volMarkCommand.Bool("readonly", false, "volume mark readonly")
+ if err = volMarkCommand.Parse(args); err != nil {
+ return nil
+ }
+ markWritable := false
+ if (*writable && *readonly) || (!*writable && !*readonly) {
+ return fmt.Errorf("use -readonly or -writable")
+ } else if *writable {
+ markWritable = true
+ }
+
+ sourceVolumeServer := *nodeStr
+
+ volumeId := needle.VolumeId(*volumeIdInt)
+
+ return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
+}
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index b136604e5..2bc8dfad8 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -166,3 +166,18 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
return deleteErr
})
}
+
+func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if writable {
+ _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
+ VolumeId: uint32(volumeId),
+ })
+ } else {
+ _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ }
+ return err
+ })
+}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 5dec21c32..1f58c9977 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -61,6 +61,7 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
+ name := name[:len(name)-len(".idx")]
noteFile := l.Directory + "/" + name + ".note"
if util.FileExists(noteFile) {
note, _ := ioutil.ReadFile(noteFile)
@@ -93,7 +94,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
return true
}
return false
diff --git a/weed/storage/store.go b/weed/storage/store.go
index b9fcfcba9..3a8c07e6e 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -200,7 +200,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
- collectionVolumeReadOnlyCount := make(map[string]uint8)
+ collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
@@ -220,11 +220,25 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
+ if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
+ collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
+ "IsReadOnly": 0,
+ "noWriteOrDelete": 0,
+ "noWriteCanDelete": 0,
+ "isDiskSpaceLow": 0,
+ }
+ }
if v.IsReadOnly() {
- collectionVolumeReadOnlyCount[v.Collection] += 1
- } else {
- if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
- collectionVolumeReadOnlyCount[v.Collection] = 0
+ collectionVolumeReadOnlyCount[v.Collection] = make(map[string]uint8)
+ collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
+ if v.noWriteOrDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
+ }
+ if v.noWriteCanDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1
+ }
+ if v.location.isDiskSpaceLow {
+ collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1
}
}
}
@@ -251,8 +265,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
- for col, count := range collectionVolumeReadOnlyCount {
- stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count))
+ for col, types := range collectionVolumeReadOnlyCount {
+ for t, count := range types {
+ stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count))
+ }
}
return &master_pb.Heartbeat{
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index e42fb238b..78c693f5c 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -4,6 +4,7 @@ import (
"fmt"
"os"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -11,17 +12,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
+func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
- if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
- return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
+ if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
+ return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
}
if indexSize == 0 {
return 0, nil
}
+ for i := 1; i <= 10; i++ {
+ // check and fix last 10 entries
+ lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
+ if err != ErrorSizeMismatch {
+ break
+ }
+ }
+ return
+}
+
+func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
var lastIdxEntry []byte
- if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
- return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
+ if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
+ return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
}
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
if offset.IsZero() {
@@ -29,15 +41,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
}
if size < 0 {
// read the deletion entry
- if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
}
} else {
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
+ return lastAppendAtNs, err
}
}
- return
+ return lastAppendAtNs, nil
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
@@ -60,7 +72,38 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
}
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
- n := new(needle.Needle)
+ n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
+ if err != nil {
+ return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
+ }
+ if n.Size != size {
+ return 0, ErrorSizeMismatch
+ }
+ if v == needle.Version3 {
+ bytes := make([]byte, TimestampSize)
+ _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ if err != nil {
+ return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
+ }
+ n.AppendAtNs = util.BytesToUint64(bytes)
+ fileTailOffset := offset + needle.GetActualSize(size, v)
+ fileSize, _, err := datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
+ }
+ if fileSize == fileTailOffset {
+ return n.AppendAtNs, nil
+ }
+ if fileSize > fileTailOffset {
+ glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ err = datFile.Truncate(fileTailOffset)
+ if err == nil {
+ return n.AppendAtNs, nil
+ }
+ return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
+ }
+ glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ }
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 73e2de02b..05684cbdb 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
}
}
- if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
+ if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}