aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-10 19:10:46 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-10 19:10:46 -0700
commitbf218cd59d0a7db1a8b5c55ccc93989b02a35738 (patch)
treee9a8e08837646c083972b58265643d8dd52f9f84
parent5ca0a551acd89932bdb79871682527186ed28f40 (diff)
downloadseaweedfs-bf218cd59d0a7db1a8b5c55ccc93989b02a35738.tar.xz
seaweedfs-bf218cd59d0a7db1a8b5c55ccc93989b02a35738.zip
removing etcd sequencer
causing go mod tidy problem. If anyone wants this, please help to resolve this first. github.com/chrislusf/seaweedfs/weed/sequence imports go.etcd.io/etcd/client tested by go.etcd.io/etcd/client.test imports github.com/coreos/etcd/integration imports github.com/coreos/etcd/proxy/grpcproxy imports google.golang.org/grpc/naming: module google.golang.org/grpc@latest found (v1.41.0), but does not contain package google.golang.org/grpc/naming
-rw-r--r--weed/command/scaffold/master.toml5
-rw-r--r--weed/sequence/etcd_sequencer.go296
-rw-r--r--weed/server/master_server.go10
3 files changed, 1 insertions, 310 deletions
diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml
index 020f48e36..363493db3 100644
--- a/weed/command/scaffold/master.toml
+++ b/weed/command/scaffold/master.toml
@@ -23,10 +23,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs
[master.sequencer]
-type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
-# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
-# example : http://127.0.0.1:2379,http://127.0.0.1:2389
-sequencer_etcd_urls = "http://127.0.0.1:2379"
+type = "raft" # Choose [raft|snowflake] type for storing the file id sequence
# when sequencer.type = snowflake, the snowflake id must be different from other masters
sequencer_snowflake_id = 0 # any number between 1~1023
diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go
deleted file mode 100644
index a9f2bb97f..000000000
--- a/weed/sequence/etcd_sequencer.go
+++ /dev/null
@@ -1,296 +0,0 @@
-package sequence
-
-/*
-Note :
-(1) store the sequence in the ETCD cluster, and local file(sequence.dat)
-(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
-(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
-*/
-
-import (
- "context"
- "fmt"
- "go.etcd.io/etcd/client"
- "sync"
- "time"
-
- "io"
- "os"
- "strconv"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
-)
-
-const (
- // EtcdKeyPrefix = "/seaweedfs"
- EtcdKeySequence = "/master/sequence"
- EtcdContextTimeoutSecond = 100 * time.Second
- DefaultEtcdSteps uint64 = 500 // internal counter
- SequencerFileName = "sequencer.dat"
- FileMaxSequenceLength = 128
-)
-
-type EtcdSequencer struct {
- sequenceLock sync.Mutex
-
- // available sequence range : [currentSeqId, maxSeqId)
- currentSeqId uint64
- maxSeqId uint64
-
- keysAPI client.KeysAPI
- seqFile *os.File
-}
-
-func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
- file, err := openSequenceFile(metaFolder + "/" + SequencerFileName)
- if nil != err {
- return nil, fmt.Errorf("open sequence file fialed, %v", err)
- }
-
- cli, err := client.New(client.Config{
- Endpoints: strings.Split(etcdUrls, ","),
- Username: "",
- Password: "",
- })
- if err != nil {
- return nil, err
- }
- keysApi := client.NewKeysAPI(cli)
-
- // TODO: the current sequence id in local file is not used
- maxValue, _, err := readSequenceFile(file)
- if err != nil {
- return nil, fmt.Errorf("read sequence from file failed, %v", err)
- }
- glog.V(4).Infof("read sequence from file : %d", maxValue)
-
- newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue)
- if err != nil {
- return nil, err
- }
-
- sequencer := &EtcdSequencer{maxSeqId: newSeq,
- currentSeqId: newSeq,
- keysAPI: keysApi,
- seqFile: file,
- }
- return sequencer, nil
-}
-
-func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
- es.sequenceLock.Lock()
- defer es.sequenceLock.Unlock()
-
- if (es.currentSeqId + count) >= es.maxSeqId {
- reqSteps := DefaultEtcdSteps
- if count > DefaultEtcdSteps {
- reqSteps += count
- }
- maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
- glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
- if err != nil {
- glog.Error(err)
- return 0
- }
- es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
- glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
-
- if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
- glog.Errorf("flush sequence to file failed, %v", err)
- }
- }
-
- ret := es.currentSeqId
- es.currentSeqId += count
- return ret
-}
-
-/**
-instead of collecting the max value from volume server,
-the max value should be saved in local config file and ETCD cluster
-*/
-func (es *EtcdSequencer) SetMax(seenValue uint64) {
- es.sequenceLock.Lock()
- defer es.sequenceLock.Unlock()
- if seenValue > es.maxSeqId {
- maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
- if err != nil {
- glog.Errorf("set Etcd Max sequence failed : %v", err)
- return
- }
- es.currentSeqId, es.maxSeqId = maxId, maxId
-
- if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
- glog.Errorf("flush sequence to file failed, %v", err)
- }
- }
-}
-
-func (es *EtcdSequencer) GetMax() uint64 {
- return es.maxSeqId
-}
-
-func (es *EtcdSequencer) Peek() uint64 {
- return es.currentSeqId
-}
-
-func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
- if step <= 0 {
- return 0, fmt.Errorf("the step must be large than 1")
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
- var endSeqValue uint64 = 0
- defer cancel()
- for {
- getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
- if err != nil {
- return 0, err
- }
- if getResp.Node == nil {
- continue
- }
-
- prevValue := getResp.Node.Value
- prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64)
- if err != nil {
- return 0, fmt.Errorf("get sequence from etcd failed, %v", err)
- }
- endSeqValue = prevSeqValue + step
- endSeqStr := strconv.FormatUint(endSeqValue, 10)
-
- _, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue})
- if err == nil {
- break
- }
- glog.Error(err)
- }
-
- return endSeqValue, nil
-}
-
-/**
-update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
-when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
-return the value of EtcdKeySequence in the ETCD cluster;
-when the value of the EtcdKeySequence is less than the parameter maxSeq,
-return the value of the parameter maxSeq
-*/
-func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
- maxSeqStr := strconv.FormatUint(maxSeq, 10)
- ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
- defer cancel()
-
- for {
- getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
- if err != nil {
- if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
- _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
- if err == nil {
- continue
- }
- if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
- continue
- }
- return 0, err
- } else {
- return 0, err
- }
- }
-
- if getResp.Node == nil {
- continue
- }
- prevSeqStr := getResp.Node.Value
- prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64)
- if err != nil {
- return 0, err
- }
- if prevSeq >= maxSeq {
- return prevSeq, nil
- }
-
- _, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr})
- if err != nil {
- return 0, err
- }
- }
-}
-
-func openSequenceFile(file string) (*os.File, error) {
- _, err := os.Stat(file)
- if os.IsNotExist(err) {
- fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- if err := writeSequenceFile(fid, 1, 0); err != nil {
- return nil, err
- }
- return fid, nil
- } else {
- return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
- }
-}
-
-/*
-read sequence and step from sequence file
-*/
-func readSequenceFile(file *os.File) (uint64, uint64, error) {
- sequence := make([]byte, FileMaxSequenceLength)
- size, err := file.ReadAt(sequence, 0)
- if (err != nil) && (err != io.EOF) {
- err := fmt.Errorf("cannot read file %s, %v", file.Name(), err)
- return 0, 0, err
- }
- sequence = sequence[0:size]
- seqs := strings.Split(string(sequence), ":")
- maxId, err := strconv.ParseUint(seqs[0], 10, 64)
- if err != nil {
- return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
- }
-
- if len(seqs) > 1 {
- step, err := strconv.ParseUint(seqs[1], 10, 64)
- if err != nil {
- return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
- }
- return maxId, step, nil
- }
-
- return maxId, 0, nil
-}
-
-/**
-write the sequence and step to sequence file
-*/
-func writeSequenceFile(file *os.File, sequence, step uint64) error {
- _ = step
- seqStr := fmt.Sprintf("%d:%d", sequence, sequence)
- if _, err := file.Seek(0, 0); err != nil {
- err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
- return err
- }
- if err := file.Truncate(0); err != nil {
- return fmt.Errorf("truncate sequence file faield : %v", err)
- }
- if _, err := file.WriteString(seqStr); err != nil {
- return fmt.Errorf("write file %s failed, %v", file.Name(), err)
- }
- if err := file.Sync(); err != nil {
- return fmt.Errorf("flush file %s failed, %v", file.Name(), err)
- }
- return nil
-}
-
-// the UT helper method
-// func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
-// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
-// defer cancel()
-// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
-// if err != nil {
-// return err
-// }
-// return nil
-// }
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 8de01abf7..3b3b1c94b 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -28,7 +28,6 @@ import (
const (
SequencerType = "master.sequencer.type"
- SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
)
@@ -286,15 +285,6 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
- case "etcd":
- var err error
- urls := v.GetString(SequencerEtcdUrls)
- glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
- seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
- if err != nil {
- glog.Error(err)
- seq = nil
- }
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)