aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/master.go6
-rw-r--r--weed/sequence/etcd_sequencer.go198
-rw-r--r--weed/server/master_server.go28
3 files changed, 91 insertions, 141 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index 3d33f4f7a..55e3409ed 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -37,6 +37,9 @@ type MasterOptions struct {
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
+
+ sequencerType *string
+ etcdUrls *string
}
func init() {
@@ -55,6 +58,9 @@ func init() {
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ m.sequencerType = cmdMaster.Flag.String("sequencerType", "memory", "Choose [memory|etcd] type for store the file sequence")
+ m.etcdUrls = cmdMaster.Flag.String("etcdUrls", "",
+ "when sequencerType=etcd, set etcdUrls for etcd cluster that store file sequence, example : http://127.0.0.1:2379,http://127.0.0.1:2389")
}
var cmdMaster = &Command{
diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go
index 51e0ec93f..1fc378640 100644
--- a/weed/sequence/etcd_sequencer.go
+++ b/weed/sequence/etcd_sequencer.go
@@ -1,21 +1,30 @@
package sequence
+/*
+Note :
+(1) store the sequence in the ETCD cluster, and local file(sequence.dat)
+(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
+(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
+*/
+
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "go.etcd.io/etcd/client"
+ "sync"
+ "time"
+
"io"
"os"
"strconv"
"strings"
- "sync"
- "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "go.etcd.io/etcd/client"
)
const (
+ // EtcdKeyPrefix = "/seaweedfs"
EtcdKeySequence = "/master/sequence"
- EtcdKeyPrefix = "/seaweedfs"
EtcdContextTimeoutSecond = 100 * time.Second
DefaultEtcdSteps uint64 = 500 // internal counter
SequencerFileName = "sequencer.dat"
@@ -25,13 +34,12 @@ const (
type EtcdSequencer struct {
sequenceLock sync.Mutex
- // available sequence range : [steps, maxCounter)
- maxCounter uint64
- steps uint64
+ // available sequence range : [currentSeqId, maxSeqId)
+ currentSeqId uint64
+ maxSeqId uint64
- etcdClient client.Client
- keysAPI client.KeysAPI
- seqFile *os.File
+ keysAPI client.KeysAPI
+ seqFile *os.File
}
func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
@@ -50,6 +58,7 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error
}
keysApi := client.NewKeysAPI(cli)
+ // TODO: the current sequence id in local file is not used
maxValue, _, err := readSequenceFile(file)
if err != nil {
return nil, fmt.Errorf("read sequence from file failed, %v", err)
@@ -61,22 +70,19 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error
return nil, err
}
- // make the step and max the same, and then they are fake,
- // after invoking the NextFileId(), they are different and real
- maxCounter, steps := newSeq, newSeq
- sequencer := &EtcdSequencer{maxCounter: maxCounter,
- steps: steps,
- etcdClient: cli,
- keysAPI: keysApi,
- seqFile: file,
+ sequencer := &EtcdSequencer{maxSeqId: newSeq,
+ currentSeqId: newSeq,
+ keysAPI: keysApi,
+ seqFile: file,
}
return sequencer, nil
}
-func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) {
+func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
es.sequenceLock.Lock()
defer es.sequenceLock.Unlock()
- if (es.steps + count) >= es.maxCounter {
+
+ if (es.currentSeqId + count) >= es.maxSeqId {
reqSteps := DefaultEtcdSteps
if count > DefaultEtcdSteps {
reqSteps += count
@@ -85,18 +91,19 @@ func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) {
glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
if err != nil {
glog.Error(err)
- return 0, 0
+ return 0
}
- es.steps, es.maxCounter = maxId-reqSteps, maxId
- glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter)
+ es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
+ glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
- if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
+ if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err)
}
}
- ret := es.steps
- es.steps += count
- return ret, count
+
+ ret := es.currentSeqId
+ es.currentSeqId += count
+ return ret
}
/**
@@ -106,13 +113,13 @@ the max value should be saved in local config file and ETCD cluster
func (es *EtcdSequencer) SetMax(seenValue uint64) {
es.sequenceLock.Lock()
defer es.sequenceLock.Unlock()
- if seenValue > es.maxCounter {
+ if seenValue > es.maxSeqId {
maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
if err != nil {
glog.Errorf("set Etcd Max sequence failed : %v", err)
return
}
- es.steps, es.maxCounter = maxId, maxId
+ es.currentSeqId, es.maxSeqId = maxId, maxId
if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err)
@@ -121,11 +128,11 @@ func (es *EtcdSequencer) SetMax(seenValue uint64) {
}
func (es *EtcdSequencer) GetMax() uint64 {
- return es.maxCounter
+ return es.maxSeqId
}
func (es *EtcdSequencer) Peek() uint64 {
- return es.steps
+ return es.currentSeqId
}
func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
@@ -164,8 +171,11 @@ func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error)
}
/**
- update the key of EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
-until the value of EtcdKeySequence is equal to or larger than the maxSeq
+update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
+when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
+return the value of EtcdKeySequence in the ETCD cluster;
+when the value of the EtcdKeySequence is less than the parameter maxSeq,
+return the value of the parameter maxSeq
*/
func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
maxSeqStr := strconv.FormatUint(maxSeq, 10)
@@ -178,10 +188,10 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
_, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
if err == nil {
- continue // create ETCD key success, retry get ETCD value
+ continue
}
if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
- continue // ETCD key exist, retry get ETCD value
+ continue
}
return 0, err
} else {
@@ -206,8 +216,6 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
return 0, err
}
}
-
- return maxSeq, nil
}
func openSequenceFile(file string) (*os.File, error) {
@@ -227,7 +235,7 @@ func openSequenceFile(file string) (*os.File, error) {
}
/*
- sequence : step 以冒号分割
+read sequence and step from sequence file
*/
func readSequenceFile(file *os.File) (uint64, uint64, error) {
sequence := make([]byte, FileMaxSequenceLength)
@@ -255,7 +263,7 @@ func readSequenceFile(file *os.File) (uint64, uint64, error) {
}
/**
-先不存放step到文件中
+write the sequence and step to sequence file
*/
func writeSequenceFile(file *os.File, sequence, step uint64) error {
_ = step
@@ -276,103 +284,13 @@ func writeSequenceFile(file *os.File, sequence, step uint64) error {
return nil
}
-func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
- ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
- defer cancel()
- _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
- if err != nil {
- return err
- }
- return nil
-}
-
-//func (es *EtcdSequencer) Load() error {
-// es.sequenceLock.Lock()
-// defer es.sequenceLock.Unlock()
-// reqSteps := DefaultEtcdSteps
-// maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
-// glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
-// if err != nil {
-// glog.Error(err)
-// return err
-// }
-// es.steps, es.maxCounter = maxId-reqSteps, maxId
-// glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter)
-//
-// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
-// glog.Errorf("flush sequence to file failed, %v", err)
-// return err
-// }
-// return nil
-//}
-
-//func getEtcdKey(kv client.KeysAPI, key string) (string, error) {
-// resp, err := kv.Get(context.Background(), key, &client.GetOptions{Recursive: false, Quorum: true})
-// if err != nil {
-// glog.Warningf("key:%s result:%v", EtcdKeySequence, err)
-// return "", err
-// }
-// if resp.Node == nil {
-// return "", fmt.Errorf("the key is not exist, %s", key)
-// }
-// return resp.Node.Value, nil
-//}
-
-//func (es *EtcdSequencer) setLocalSequence(maxValue uint64) {
-// es.sequenceLock.Lock()
-// defer es.sequenceLock.Unlock()
-// if maxValue > es.maxCounter {
-// es.maxCounter, es.steps = maxValue, maxValue-DefaultEtcdSteps
-//
-// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
-// glog.Errorf("flush sequence to file failed, %v", err)
-// }
-// }
-//}
-
-//func getEtcdKeysApi(etcdUrls, user, passwd string) (client.KeysAPI, error) {
-// cli, err := client.New(client.Config{
-// Endpoints: strings.Split(etcdUrls, ","),
-// Username: user,
-// Password: passwd,
-// })
-// if err != nil {
-// return nil, err
-// }
-// keysApi := client.NewKeysAPI(cli)
-// return keysApi, nil
-//}
-
-//func (es *EtcdSequencer) asyncStartWatcher() {
-// es.startWatcher(es.keysAPI, EtcdKeySequence, func(value string, index uint64) {
-// newValue, err := strconv.ParseUint(value, 10, 64)
-// if err != nil {
-// glog.Warning(err)
-// }
-// es.setLocalSequence(newValue)
-// })
-//}
-
-//func (es *EtcdSequencer) startWatcher(kvApi client.KeysAPI, key string, callback func(value string, index uint64)) {
-// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
-// defer cancel()
-// ctx.Done()
-//
-// getResp, err := kvApi.Get(ctx, key, &client.GetOptions{Recursive: false, Quorum: true})
-// if err != nil {
-// return
-// }
-//
-// watcher := kvApi.Watcher(key, &client.WatcherOptions{AfterIndex: getResp.Index, Recursive: false})
-// go func(w client.Watcher) {
-// for {
-// resp, err := w.Next(context.Background())
-// if err != nil {
-// glog.Error(err)
-// continue
-// }
-// callback(resp.Node.Value, resp.Index)
-// }
-// }(watcher)
-// return
-//}
+// the UT helper method
+// func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
+// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
+// defer cancel()
+// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
+// if err != nil {
+// return err
+// }
+// return nil
+// }
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cde583560..fd3236c53 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -39,6 +39,9 @@ type MasterOption struct {
DisableHttp bool
MetricsAddress string
MetricsIntervalSec int
+
+ sequencerType string
+ etcdUrls string
}
type MasterServer struct {
@@ -87,7 +90,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -230,3 +237,22 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ glog.V(0).Infof("sequencer type [%s]", option.sequencerType)
+ switch strings.ToLower(option.sequencerType) {
+ case "memory":
+ seq = sequence.NewMemorySequencer()
+ case "etcd":
+ var err error
+ seq, err = sequence.NewEtcdSequencer(option.etcdUrls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}