aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-16 11:42:23 -0700
committerchrislu <chris.lu@gmail.com>2024-03-16 11:42:23 -0700
commit6a61b54f29df704bbe2b8fb69cb234cac530bd48 (patch)
treeb21c2f6923941d8287acdc9a080b8de21d59f848 /weed/server
parent205829fa22216228be40effb9d684aa7900ded57 (diff)
parent27bb38228b647e34fe20a6016fa04c829138c272 (diff)
downloadseaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.tar.xz
seaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.zip
Merge branch 'mq-subscribe'
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go4
-rw-r--r--weed/server/filer_grpc_server_dlm.go21
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go21
-rw-r--r--weed/server/filer_server.go17
4 files changed, 39 insertions, 24 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index f32273f26..eeb031cd1 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
- lock := lockClient.NewLock(string(fullpath), string(fs.option.Host))
- defer lock.StopLock()
+ lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
+ defer lock.StopShortLivedLock()
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath)
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go
index 1f5644680..5ec147835 100644
--- a/weed/server/filer_grpc_server_dlm.go
+++ b/weed/server/filer_grpc_server_dlm.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -18,7 +19,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
var movedTo pb.ServerAddress
expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
- resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
+ resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
glog.V(3).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -31,7 +32,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
})
if err == nil {
resp.RenewToken = secondResp.RenewToken
- } else {
+ resp.LockOwner = secondResp.LockOwner
resp.Error = secondResp.Error
}
return err
@@ -42,7 +43,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
resp.Error = fmt.Sprintf("%v", err)
}
if movedTo != "" {
- resp.MovedTo = string(movedTo)
+ resp.LockHostMovedTo = string(movedTo)
}
return resp, nil
@@ -81,10 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo
func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- if !req.IsMoved && movedTo != "" {
+ if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: req.Name,
@@ -100,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock
return nil, err
}
}
+
+ if owner == "" {
+ glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
+ return nil, status.Error(codes.NotFound, fmt.Sprintf("lock %s not found", req.Name))
+ }
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
return &filer_pb.FindLockOwnerResponse{
Owner: owner,
}, nil
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index eb69a6aeb..436c4158f 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"strings"
+ "sync/atomic"
"time"
"google.golang.org/protobuf/proto"
@@ -32,7 +33,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
- lastReadTime := time.Unix(0, req.SinceNs)
+ lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -57,7 +58,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
+ lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -113,7 +114,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
}()
- lastReadTime := time.Unix(0, req.SinceNs)
+ lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -138,7 +139,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
+ lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
@@ -150,7 +151,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
+ atomic.AddInt64(&fs.listenersWaits, 1)
fs.listenersCond.Wait()
+ atomic.AddInt64(&fs.listenersWaits, -1)
fs.listenersLock.Unlock()
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
return false
@@ -178,19 +181,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
-func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
- return func(logEntry *filer_pb.LogEntry) error {
+func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
+ return func(logEntry *filer_pb.LogEntry) (bool, error) {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
- return err
+ return false, err
}
- return nil
+ return false, nil
}
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 9428c2edf..356761f30 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -7,6 +7,7 @@ import (
"os"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -76,7 +77,13 @@ type FilerOption struct {
}
type FilerServer struct {
- inFlightDataSize int64
+ inFlightDataSize int64
+ listenersWaits int64
+
+ // notifying clients
+ listenersLock sync.Mutex
+ listenersCond *sync.Cond
+
inFlightDataLimitCond *sync.Cond
filer_pb.UnimplementedSeaweedFilerServer
@@ -90,10 +97,6 @@ type FilerServer struct {
metricsAddress string
metricsIntervalSec int
- // notifying clients
- listenersLock sync.Mutex
- listenersCond *sync.Cond
-
// track known metadata listeners
knownListenersLock sync.Mutex
knownListeners map[int32]int32
@@ -135,7 +138,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
v.SetDefault("filer.options.max_file_name_length", 255)
maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
- fs.listenersCond.Broadcast()
+ if atomic.LoadInt64(&fs.listenersWaits) > 0 {
+ fs.listenersCond.Broadcast()
+ }
})
fs.filer.Cipher = option.Cipher
// we do not support IP whitelist right now