diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-25 11:21:19 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-25 11:21:19 -0700 |
| commit | 707ff536a38ce1408d3ce35b2afaa43513718965 (patch) | |
| tree | 8e198fce9639e4a8b5b15645c556193ac84c707b /weed | |
| parent | d6a066495b2b335f1cd20005e4a48f1f53a42e65 (diff) | |
| parent | 8b3756b815f86c1ef465631a8b20223c4abe7156 (diff) | |
| download | seaweedfs-707ff536a38ce1408d3ce35b2afaa43513718965.tar.xz seaweedfs-707ff536a38ce1408d3ce35b2afaa43513718965.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/scaffold/filer.toml | 6 | ||||
| -rw-r--r-- | weed/command/shell.go | 30 | ||||
| -rw-r--r-- | weed/filer/etcd/etcd_store.go | 82 | ||||
| -rw-r--r-- | weed/filer/etcd/etcd_store_test.go | 2 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_objects_list_handlers.go | 14 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 3 | ||||
| -rw-r--r-- | weed/stats/metrics.go | 9 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 110 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 12 | ||||
| -rw-r--r-- | weed/weed.go | 15 |
11 files changed, 230 insertions, 55 deletions
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 231e7510a..30a9cae51 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -270,6 +270,12 @@ username = "" password = "" key_prefix = "seaweedfs." timeout = "3s" +# Set the CA certificate path +tls_ca_file="" +# Set the client certificate path +tls_client_crt_file="" +# Set the client private key path +tls_client_key_file="" [mongodb] enabled = false diff --git a/weed/command/shell.go b/weed/command/shell.go index f562f624e..f78ba89fc 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -19,7 +19,7 @@ func init() { cmdShell.Run = runShell // break init cycle shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333") shellOptions.FilerGroup = cmdShell.Flag.String("filerGroup", "", "filerGroup for the filers") - shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888") + shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port for initial connection, e.g. localhost:8888") shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml") } @@ -30,32 +30,36 @@ var cmdShell = &Command{ Generate shell.toml via "weed scaffold -config=shell" - `, +`, } func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + shellOptions.Directory = "/" + + util.LoadConfiguration("shell", false) + viper := util.GetViper() + cluster := viper.GetString("cluster.default") + if *shellCluster != "" { + cluster = *shellCluster + } if *shellOptions.Masters == "" { - util.LoadConfiguration("shell", false) - v := util.GetViper() - cluster := v.GetString("cluster.default") - if *shellCluster != "" { - cluster = *shellCluster - } if cluster == "" { *shellOptions.Masters = "localhost:9333" } else { - *shellOptions.Masters = v.GetString("cluster." + cluster + ".master") - *shellInitialFiler = v.GetString("cluster." + cluster + ".filer") - fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) + *shellOptions.Masters = viper.GetString("cluster." + cluster + ".master") } } - shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler) - shellOptions.Directory = "/" + filerAddress := *shellInitialFiler + if filerAddress == "" && cluster != "" { + filerAddress = viper.GetString("cluster." + cluster + ".filer") + } + shellOptions.FilerAddress = pb.ServerAddress(filerAddress) + fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, shellOptions.FilerAddress) shell.RunShell(shellOptions) diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 9f96405a9..fa2a72ca5 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -1,9 +1,10 @@ package etcd import ( - "bytes" "context" + "crypto/tls" "fmt" + "go.etcd.io/etcd/client/pkg/v3/transport" "strings" "time" @@ -26,49 +27,78 @@ func init() { type EtcdStore struct { client *clientv3.Client etcdKeyPrefix string + timeout time.Duration } func (store *EtcdStore) GetName() string { return "etcd" } -func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { +func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) error { + configuration.SetDefault(prefix+"servers", "localhost:2379") + configuration.SetDefault(prefix+"timeout", "3s") + servers := configuration.GetString(prefix + "servers") - if servers == "" { - servers = "localhost:2379" - } username := configuration.GetString(prefix + "username") password := configuration.GetString(prefix + "password") store.etcdKeyPrefix = configuration.GetString(prefix + "key_prefix") - timeout := configuration.GetString(prefix + "timeout") - if timeout == "" { - timeout = "3s" + timeoutStr := configuration.GetString(prefix + "timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("parse etcd store timeout: %v", err) } + store.timeout = timeout + + certFile := configuration.GetString(prefix + "tls_client_crt_file") + keyFile := configuration.GetString(prefix + "tls_client_key_file") + caFile := configuration.GetString(prefix + "tls_ca_file") - return store.initialize(servers, username, password, timeout) + var tlsConfig *tls.Config + if caFile != "" { + tlsInfo := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + TrustedCAFile: caFile, + } + var err error + tlsConfig, err = tlsInfo.ClientConfig() + if err != nil { + return fmt.Errorf("TLS client configuration error: %v", err) + } + } + + return store.initialize(servers, username, password, store.timeout, tlsConfig) } -func (store *EtcdStore) initialize(servers string, username string, password string, timeout string) (err error) { +func (store *EtcdStore) initialize(servers, username, password string, timeout time.Duration, tlsConfig *tls.Config) error { glog.Infof("filer store etcd: %s", servers) - to, err := time.ParseDuration(timeout) - if err != nil { - return fmt.Errorf("parse timeout %s: %s", timeout, err) - } - - store.client, err = clientv3.New(clientv3.Config{ + client, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(servers, ","), Username: username, Password: password, - DialTimeout: to, + DialTimeout: timeout, + TLS: tlsConfig, }) if err != nil { return fmt.Errorf("connect to etcd %s: %s", servers, err) } - return + ctx, cancel := context.WithTimeout(context.Background(), store.timeout) + defer cancel() + + resp, err := client.Status(ctx, client.Endpoints()[0]) + if err != nil { + client.Close() + return fmt.Errorf("error checking etcd connection: %s", err) + } + + glog.V(0).Infof("сonnection to etcd has been successfully verified. etcd version: %s", resp.Version) + store.client = client + + return nil } func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -148,26 +178,20 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_ } func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed -} - -func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - directoryPrefix := genDirectoryKeyPrefix(dirPath, "") + directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) lastFileStart := directoryPrefix if startFileName != "" { lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) } resp, err := store.client.Get(ctx, store.etcdKeyPrefix+string(lastFileStart), - clientv3.WithFromKey(), clientv3.WithLimit(limit+1)) + clientv3.WithRange(clientv3.GetPrefixRangeEnd(store.etcdKeyPrefix+string(directoryPrefix))), + clientv3.WithLimit(limit+1)) if err != nil { return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } for _, kv := range resp.Kvs { - if !bytes.HasPrefix(kv.Key, directoryPrefix) { - break - } fileName := getNameFromKey(kv.Key) if fileName == "" { continue @@ -196,6 +220,10 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u return lastFileName, err } +func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + func genKey(dirPath, fileName string) (key []byte) { key = []byte(dirPath) key = append(key, DIR_FILE_SEPARATOR) diff --git a/weed/filer/etcd/etcd_store_test.go b/weed/filer/etcd/etcd_store_test.go index 31e451e00..6abb74697 100644 --- a/weed/filer/etcd/etcd_store_test.go +++ b/weed/filer/etcd/etcd_store_test.go @@ -10,7 +10,7 @@ func TestStore(t *testing.T) { // to set up local env if false { store := &EtcdStore{} - store.initialize("localhost:2379", "", "", "3s") + store.initialize("localhost:2379", "", "", 3, nil) store_test.TestFilerStore(t, store) } } diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 9cd643d95..c389e0627 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -145,7 +145,6 @@ func (wfs *WFS) setRootAttr(out *fuse.AttrOut) { func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry, calculateSize bool) { out.Ino = inode - out.Blocks = (out.Size + blockSize - 1) / blockSize setBlksize(out, blockSize) if entry == nil { return @@ -159,6 +158,7 @@ func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.E if entry.FileMode()&os.ModeSymlink != 0 { out.Size = uint64(len(entry.Attributes.SymlinkTarget)) } + out.Blocks = (out.Size + blockSize - 1) / blockSize out.Mtime = uint64(entry.Attributes.Mtime) out.Ctime = uint64(entry.Attributes.Mtime) out.Atime = uint64(entry.Attributes.Mtime) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index b2ad915b9..f332da856 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -167,12 +167,16 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m if delimiter != "" { // keys that contain the same string between the prefix and the first occurrence of the delimiter are grouped together as a commonPrefix. // extract the string between the prefix and the delimiter and add it to the commonPrefixes if it's unique. - fullPath := fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):] - delimitedPath := strings.SplitN(fullPath, delimiter, 2) - if len(delimitedPath) == 2 { + undelimitedPath := fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):] + + // take into account a prefix if supplied while delimiting. + undelimitedPath = strings.TrimPrefix(undelimitedPath, originalPrefix) - // S3 clients expect the delimited prefix to contain the delimiter. - delimitedPrefix := delimitedPath[0] + delimiter + delimitedPath := strings.SplitN(undelimitedPath, delimiter, 2) + + if len(delimitedPath) == 2 { + // S3 clients expect the delimited prefix to contain the delimiter and prefix. + delimitedPrefix := originalPrefix + delimitedPath[0] + delimiter for i := range commonPrefixes { if commonPrefixes[i].Prefix == delimitedPrefix { diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 8cd6ff949..abd39b582 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -236,6 +236,9 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb. if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } + if v.DataBackend == nil { + return nil, fmt.Errorf("volume %d data backend not found", req.VolumeId) + } volumeSize, _, _ := v.DataBackend.GetStat() resp.IsReadOnly = v.IsReadOnly() diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 49a3b090b..f61f68e08 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -70,6 +70,14 @@ var ( Help: "replica placement mismatch", }, []string{"collection", "id"}) + MasterVolumeLayout = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "master", + Name: "volume_layout_total", + Help: "Number of volumes in volume layouts", + }, []string{"collection", "replica", "type"}) + MasterLeaderChangeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -259,6 +267,7 @@ func init() { Gather.MustRegister(MasterReceivedHeartbeatCounter) Gather.MustRegister(MasterLeaderChangeCounter) Gather.MustRegister(MasterReplicaPlacementMismatch) + Gather.MustRegister(MasterVolumeLayout) Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerHandlerCounter) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index c3ba65d39..a3473c677 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -3,6 +3,7 @@ package topology import ( "encoding/json" "fmt" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "testing" "github.com/seaweedfs/seaweedfs/weed/sequence" @@ -88,19 +89,35 @@ func setup(topologyLayout string) *Topology { dcMap := dcValue.(map[string]interface{}) topo.LinkChildNode(dc) for rackKey, rackValue := range dcMap { - rack := NewRack(rackKey) + dcRack := NewRack(rackKey) rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) + dc.LinkChildNode(dcRack) for serverKey, serverValue := range rackMap { server := NewDataNode(serverKey) serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) + if ip, ok := serverMap["ip"]; ok { + server.Ip = ip.(string) + } + dcRack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: needle.CurrentVersion} + Version: needle.CurrentVersion, + } + if mVal, ok := m["collection"]; ok { + vi.Collection = mVal.(string) + } + if mVal, ok := m["replication"]; ok { + rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string)) + vi.ReplicaPlacement = rp + } + if vi.ReplicaPlacement != nil { + vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType) + vl.RegisterVolume(&vi, server) + vl.setVolumeWritable(vi.Id) + } server.AddOrUpdateVolume(vi) } @@ -346,3 +363,88 @@ func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { fmt.Printf("%s : %d\n", k, v) } } + +var topologyLayout4 = ` +{ + "dc1":{ + "rack1":{ + "serverdc111":{ + "ip": "127.0.0.1", + "volumes":[ + {"id":1, "size":12312, "collection":"test", "replication":"001"}, + {"id":2, "size":12312, "collection":"test", "replication":"100"}, + {"id":4, "size":12312, "collection":"test", "replication":"100"}, + {"id":6, "size":12312, "collection":"test", "replication":"010"} + ], + "limit":100 + } + } + }, + "dc2":{ + "rack1":{ + "serverdc211":{ + "ip": "127.0.0.2", + "volumes":[ + {"id":2, "size":12312, "collection":"test", "replication":"100"}, + {"id":3, "size":12312, "collection":"test", "replication":"010"}, + {"id":5, "size":12312, "collection":"test", "replication":"001"}, + {"id":6, "size":12312, "collection":"test", "replication":"010"} + ], + "limit":100 + } + } + }, + "dc3":{ + "rack1":{ + "serverdc311":{ + "ip": "127.0.0.3", + "volumes":[ + {"id":1, "size":12312, "collection":"test", "replication":"001"}, + {"id":3, "size":12312, "collection":"test", "replication":"010"}, + {"id":4, "size":12312, "collection":"test", "replication":"100"}, + {"id":5, "size":12312, "collection":"test", "replication":"001"} + ], + "limit":100 + } + } + } +} +` + +func TestPickForWrite(t *testing.T) { + topo := setup(topologyLayout4) + volumeGrowOption := &VolumeGrowOption{ + Collection: "test", + DataCenter: "", + Rack: "", + DataNode: "", + } + for _, rpStr := range []string{"001", "010", "100"} { + rp, _ := super_block.NewReplicaPlacementFromString(rpStr) + vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType) + volumeGrowOption.ReplicaPlacement = rp + for _, dc := range []string{"", "dc1", "dc2", "dc3"} { + volumeGrowOption.DataCenter = dc + for _, r := range []string{""} { + volumeGrowOption.Rack = r + for _, dn := range []string{""} { + if dc == "" && dn != "" { + continue + } + volumeGrowOption.DataNode = dn + fileId, count, _, _, err := topo.PickForWrite(1, volumeGrowOption, vl) + if err != nil { + fmt.Println(dc, r, dn, "pick for write error :", err) + t.Fail() + } else if count == 0 { + fmt.Println(dc, r, dn, "pick for write count is zero") + t.Fail() + } else if len(fileId) == 0 { + fmt.Println(dc, r, dn, "pick for write file id is empty") + t.Fail() + } + } + } + } + } +} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 6b5d0b8da..278978292 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/seaweedfs/seaweedfs/weed/stats" "math/rand" "sync" "sync/atomic" @@ -349,18 +350,21 @@ func (vl *VolumeLayout) DoneGrowRequest() { } func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool { - active, crowded := vl.GetActiveVolumeCount(option) + total, active, crowded := vl.GetActiveVolumeCount(option) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "total").Set(float64(total)) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "active").Set(float64(active)) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "crowded").Set(float64(crowded)) //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) return active <= crowded } -func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) { +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, active, crowded int) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() - if option.DataCenter == "" { - return len(vl.writables), len(vl.crowded) + return len(vl.writables), len(vl.writables), len(vl.crowded) } + total = len(vl.writables) for _, v := range vl.writables { for _, dn := range vl.vid2location[v].list { if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { diff --git a/weed/weed.go b/weed/weed.go index 773d86274..a821cd72f 100644 --- a/weed/weed.go +++ b/weed/weed.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "text/template" + "time" "unicode" "unicode/utf8" @@ -16,6 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" + "github.com/getsentry/sentry-go" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -49,6 +51,19 @@ func main() { glog.MaxFileCount = 5 flag.Usage = usage + err := sentry.Init(sentry.ClientOptions{ + SampleRate: 0.1, + EnableTracing: true, + TracesSampleRate: 0.1, + ProfilesSampleRate: 0.1, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "sentry.Init: %v", err) + } + // Flush buffered events before the program terminates. + // Set the timeout to the maximum duration the program can afford to wait. + defer sentry.Flush(2 * time.Second) + if command.AutocompleteMain(commands) { return } |
