diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-16 11:42:23 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-16 11:42:23 -0700 |
| commit | 6a61b54f29df704bbe2b8fb69cb234cac530bd48 (patch) | |
| tree | b21c2f6923941d8287acdc9a080b8de21d59f848 /weed/server | |
| parent | 205829fa22216228be40effb9d684aa7900ded57 (diff) | |
| parent | 27bb38228b647e34fe20a6016fa04c829138c272 (diff) | |
| download | seaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.tar.xz seaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.zip | |
Merge branch 'mq-subscribe'
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 21 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 21 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 17 |
4 files changed, 39 insertions, 24 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index f32273f26..eeb031cd1 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host) - lock := lockClient.NewLock(string(fullpath), string(fs.option.Host)) - defer lock.StopLock() + lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host)) + defer lock.StopShortLivedLock() var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, fullpath) diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 1f5644680..5ec147835 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -18,7 +19,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() - resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) + resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) glog.V(3).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo) if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -31,7 +32,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe }) if err == nil { resp.RenewToken = secondResp.RenewToken - } else { + resp.LockOwner = secondResp.LockOwner resp.Error = secondResp.Error } return err @@ -42,7 +43,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe resp.Error = fmt.Sprintf("%v", err) } if movedTo != "" { - resp.MovedTo = string(movedTo) + resp.LockHostMovedTo = string(movedTo) } return resp, nil @@ -81,10 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) { owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - if !req.IsMoved && movedTo != "" { + if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ Name: req.Name, @@ -100,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock return nil, err } } + + if owner == "" { + glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err) + return nil, status.Error(codes.NotFound, fmt.Sprintf("lock %s not found", req.Name)) + } + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &filer_pb.FindLockOwnerResponse{ Owner: owner, }, nil diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index eb69a6aeb..436c4158f 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/stats" "strings" + "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -32,7 +33,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) - lastReadTime := time.Unix(0, req.SinceNs) + lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -57,7 +58,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) + lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -113,7 +114,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) }() - lastReadTime := time.Unix(0, req.SinceNs) + lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -138,7 +139,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) + lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) @@ -150,7 +151,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.listenersLock.Lock() + atomic.AddInt64(&fs.listenersWaits, 1) fs.listenersCond.Wait() + atomic.AddInt64(&fs.listenersWaits, -1) fs.listenersLock.Unlock() if !fs.hasClient(req.ClientId, req.ClientEpoch) { return false @@ -178,19 +181,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } -func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { - return func(logEntry *filer_pb.LogEntry) error { +func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType { + return func(logEntry *filer_pb.LogEntry) (bool, error) { event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) } if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { - return err + return false, err } - return nil + return false, nil } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9428c2edf..356761f30 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -7,6 +7,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -76,7 +77,13 @@ type FilerOption struct { } type FilerServer struct { - inFlightDataSize int64 + inFlightDataSize int64 + listenersWaits int64 + + // notifying clients + listenersLock sync.Mutex + listenersCond *sync.Cond + inFlightDataLimitCond *sync.Cond filer_pb.UnimplementedSeaweedFilerServer @@ -90,10 +97,6 @@ type FilerServer struct { metricsAddress string metricsIntervalSec int - // notifying clients - listenersLock sync.Mutex - listenersCond *sync.Cond - // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 @@ -135,7 +138,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) v.SetDefault("filer.options.max_file_name_length", 255) maxFilenameLength := v.GetUint32("filer.options.max_file_name_length") fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() { - fs.listenersCond.Broadcast() + if atomic.LoadInt64(&fs.listenersWaits) > 0 { + fs.listenersCond.Broadcast() + } }) fs.filer.Cipher = option.Cipher // we do not support IP whitelist right now |
