aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2019-11-11 12:43:24 -0800
committerGitHub <noreply@github.com>2019-11-11 12:43:24 -0800
commitd7852ebb3b8900baeff3d806f8fe2404c511aa5d (patch)
treeb41c48bff0926c422324bd1286ffb37abb990c93
parent9c2f3b1b0f2b7e4617975edde4f4872ff6418641 (diff)
parentd07701fa757991df6397ffde5887a70e5361d1f4 (diff)
downloadseaweedfs-d7852ebb3b8900baeff3d806f8fe2404c511aa5d.tar.xz
seaweedfs-d7852ebb3b8900baeff3d806f8fe2404c511aa5d.zip
Merge pull request #1102 from stlpmo-jn/add_ETCD_sequencer
Add etcd sequencer
-rw-r--r--weed/command/scaffold.go7
-rw-r--r--weed/sequence/etcd_sequencer.go296
-rw-r--r--weed/server/master_server.go38
-rw-r--r--weed/util/config.go4
4 files changed, 342 insertions, 3 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 7a988cdcf..6fa72c730 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -346,5 +346,12 @@ scripts = """
"""
sleep_minutes = 17 # sleep minutes between each script execution
+sequencer_type = memory # Choose [memory|etcd] type for storing the file id sequence
+
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = http://127.0.0.1:2379
+
+
`
)
diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go
new file mode 100644
index 000000000..1fc378640
--- /dev/null
+++ b/weed/sequence/etcd_sequencer.go
@@ -0,0 +1,296 @@
+package sequence
+
+/*
+Note :
+(1) store the sequence in the ETCD cluster, and local file(sequence.dat)
+(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
+(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
+*/
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "io"
+ "os"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "go.etcd.io/etcd/client"
+)
+
+const (
+ // EtcdKeyPrefix = "/seaweedfs"
+ EtcdKeySequence = "/master/sequence"
+ EtcdContextTimeoutSecond = 100 * time.Second
+ DefaultEtcdSteps uint64 = 500 // internal counter
+ SequencerFileName = "sequencer.dat"
+ FileMaxSequenceLength = 128
+)
+
+type EtcdSequencer struct {
+ sequenceLock sync.Mutex
+
+ // available sequence range : [currentSeqId, maxSeqId)
+ currentSeqId uint64
+ maxSeqId uint64
+
+ keysAPI client.KeysAPI
+ seqFile *os.File
+}
+
+func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
+ file, err := openSequenceFile(metaFolder + "/" + SequencerFileName)
+ if nil != err {
+ return nil, fmt.Errorf("open sequence file fialed, %v", err)
+ }
+
+ cli, err := client.New(client.Config{
+ Endpoints: strings.Split(etcdUrls, ","),
+ Username: "",
+ Password: "",
+ })
+ if err != nil {
+ return nil, err
+ }
+ keysApi := client.NewKeysAPI(cli)
+
+ // TODO: the current sequence id in local file is not used
+ maxValue, _, err := readSequenceFile(file)
+ if err != nil {
+ return nil, fmt.Errorf("read sequence from file failed, %v", err)
+ }
+ glog.V(4).Infof("read sequence from file : %d", maxValue)
+
+ newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue)
+ if err != nil {
+ return nil, err
+ }
+
+ sequencer := &EtcdSequencer{maxSeqId: newSeq,
+ currentSeqId: newSeq,
+ keysAPI: keysApi,
+ seqFile: file,
+ }
+ return sequencer, nil
+}
+
+func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
+ es.sequenceLock.Lock()
+ defer es.sequenceLock.Unlock()
+
+ if (es.currentSeqId + count) >= es.maxSeqId {
+ reqSteps := DefaultEtcdSteps
+ if count > DefaultEtcdSteps {
+ reqSteps += count
+ }
+ maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
+ glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
+ if err != nil {
+ glog.Error(err)
+ return 0
+ }
+ es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
+ glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
+
+ if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
+ glog.Errorf("flush sequence to file failed, %v", err)
+ }
+ }
+
+ ret := es.currentSeqId
+ es.currentSeqId += count
+ return ret
+}
+
+/**
+instead of collecting the max value from volume server,
+the max value should be saved in local config file and ETCD cluster
+*/
+func (es *EtcdSequencer) SetMax(seenValue uint64) {
+ es.sequenceLock.Lock()
+ defer es.sequenceLock.Unlock()
+ if seenValue > es.maxSeqId {
+ maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
+ if err != nil {
+ glog.Errorf("set Etcd Max sequence failed : %v", err)
+ return
+ }
+ es.currentSeqId, es.maxSeqId = maxId, maxId
+
+ if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
+ glog.Errorf("flush sequence to file failed, %v", err)
+ }
+ }
+}
+
+func (es *EtcdSequencer) GetMax() uint64 {
+ return es.maxSeqId
+}
+
+func (es *EtcdSequencer) Peek() uint64 {
+ return es.currentSeqId
+}
+
+func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
+ if step <= 0 {
+ return 0, fmt.Errorf("the step must be large than 1")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
+ var endSeqValue uint64 = 0
+ defer cancel()
+ for {
+ getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
+ if err != nil {
+ return 0, err
+ }
+ if getResp.Node == nil {
+ continue
+ }
+
+ prevValue := getResp.Node.Value
+ prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("get sequence from etcd failed, %v", err)
+ }
+ endSeqValue = prevSeqValue + step
+ endSeqStr := strconv.FormatUint(endSeqValue, 10)
+
+ _, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue})
+ if err == nil {
+ break
+ }
+ glog.Error(err)
+ }
+
+ return endSeqValue, nil
+}
+
+/**
+update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
+when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
+return the value of EtcdKeySequence in the ETCD cluster;
+when the value of the EtcdKeySequence is less than the parameter maxSeq,
+return the value of the parameter maxSeq
+*/
+func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
+ maxSeqStr := strconv.FormatUint(maxSeq, 10)
+ ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
+ defer cancel()
+
+ for {
+ getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
+ if err != nil {
+ if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
+ _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
+ if err == nil {
+ continue
+ }
+ if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
+ continue
+ }
+ return 0, err
+ } else {
+ return 0, err
+ }
+ }
+
+ if getResp.Node == nil {
+ continue
+ }
+ prevSeqStr := getResp.Node.Value
+ prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ if prevSeq >= maxSeq {
+ return prevSeq, nil
+ }
+
+ _, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr})
+ if err != nil {
+ return 0, err
+ }
+ }
+}
+
+func openSequenceFile(file string) (*os.File, error) {
+ _, err := os.Stat(file)
+ if os.IsNotExist(err) {
+ fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
+ if err != nil {
+ return nil, err
+ }
+ if err := writeSequenceFile(fid, 1, 0); err != nil {
+ return nil, err
+ }
+ return fid, nil
+ } else {
+ return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
+ }
+}
+
+/*
+read sequence and step from sequence file
+*/
+func readSequenceFile(file *os.File) (uint64, uint64, error) {
+ sequence := make([]byte, FileMaxSequenceLength)
+ size, err := file.ReadAt(sequence, 0)
+ if (err != nil) && (err != io.EOF) {
+ err := fmt.Errorf("cannot read file %s, %v", file.Name(), err)
+ return 0, 0, err
+ }
+ sequence = sequence[0:size]
+ seqs := strings.Split(string(sequence), ":")
+ maxId, err := strconv.ParseUint(seqs[0], 10, 64)
+ if err != nil {
+ return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
+ }
+
+ if len(seqs) > 1 {
+ step, err := strconv.ParseUint(seqs[1], 10, 64)
+ if err != nil {
+ return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
+ }
+ return maxId, step, nil
+ }
+
+ return maxId, 0, nil
+}
+
+/**
+write the sequence and step to sequence file
+*/
+func writeSequenceFile(file *os.File, sequence, step uint64) error {
+ _ = step
+ seqStr := fmt.Sprintf("%d:%d", sequence, sequence)
+ if _, err := file.Seek(0, 0); err != nil {
+ err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
+ return err
+ }
+ if err := file.Truncate(0); err != nil {
+ return fmt.Errorf("truncate sequence file faield : %v", err)
+ }
+ if _, err := file.WriteString(seqStr); err != nil {
+ return fmt.Errorf("write file %s failed, %v", file.Name(), err)
+ }
+ if err := file.Sync(); err != nil {
+ return fmt.Errorf("flush file %s failed, %v", file.Name(), err)
+ }
+ return nil
+}
+
+// the UT helper method
+// func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
+// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
+// defer cancel()
+// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
+// if err != nil {
+// return err
+// }
+// return nil
+// }
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cde583560..15e6ee51c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -27,6 +27,12 @@ import (
"google.golang.org/grpc"
)
+const (
+ MasterPrefix = "master.maintenance"
+ SequencerType = MasterPrefix + ".sequencer_type"
+ SequencerEtcdUrls = MasterPrefix + ".sequencer_etcd_urls"
+)
+
type MasterOption struct {
Port int
MetaFolder string
@@ -87,7 +93,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -165,8 +175,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
@@ -230,3 +240,25 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ seqType := strings.ToLower(util.Config().GetString(SequencerType))
+ glog.V(0).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "memory":
+ seq = sequence.NewMemorySequencer()
+ case "etcd":
+ var err error
+ urls := util.Config().GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}
diff --git a/weed/util/config.go b/weed/util/config.go
index 1ea833d1f..7e2f9b373 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -40,5 +40,9 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
}
return true
+}
+func Config() Configuration {
+ return viper.GetViper()
}
+