diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/messaging | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 113 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server.go | 37 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 116 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 112 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 177 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 114 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution.go | 38 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution_test.go | 32 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_manager.go | 124 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_config.go | 5 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_pub.go | 76 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_sub.go | 85 | ||||
| -rw-r--r-- | weed/messaging/msgclient/client.go | 55 | ||||
| -rw-r--r-- | weed/messaging/msgclient/config.go | 63 | ||||
| -rw-r--r-- | weed/messaging/msgclient/publisher.go | 118 | ||||
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 120 |
16 files changed, 1385 insertions, 0 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go new file mode 100644 index 000000000..8e5b56fd0 --- /dev/null +++ b/weed/messaging/broker/broker_append.go @@ -0,0 +1,113 @@ +package broker + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { + + assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) + if err2 != nil { + return err2 + } + + dir, name := util.FullPath(targetFile).DirAndName() + + // append the chunk + if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AppendToEntryRequest{ + Directory: dir, + EntryName: name, + Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)}, + } + + _, err := client.AppendToEntry(context.Background(), request) + if err != nil { + glog.V(0).Infof("append to file %v: %v", request, err) + return err + } + + return nil + }); err != nil { + return fmt.Errorf("append to file %v: %v", targetFile, err) + } + + return nil +} + +func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + + var assignResult = &operation.AssignResult{} + + // assign a volume location + if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: topicConfig.Replication, + Collection: topicConfig.Collection, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + assignResult.Auth = security.EncodedJwt(resp.Auth) + assignResult.Fid = resp.FileId + assignResult.Url = resp.Url + assignResult.PublicUrl = resp.PublicUrl + assignResult.Count = uint64(resp.Count) + + return nil + }); err != nil { + return nil, nil, err + } + + // upload data + targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} + +var _ = filer_pb.FilerClient(&MessageBroker{}) + +func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { + + for _, filer := range broker.option.Filers { + if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err == io.EOF { + return + } + glog.V(0).Infof("fail to connect to %s: %v", filer, err) + } else { + break + } + } + + return + +} + +func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go new file mode 100644 index 000000000..ba141fdd0 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server.go @@ -0,0 +1,37 @@ +package broker + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { + resp := &messaging_pb.DeleteTopicResponse{} + dir, entry := genTopicDirEntry(request.Namespace, request.Topic) + if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { + return nil, err + } else if exists { + err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil) + } + return resp, nil +} + +func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { + panic("implement me") +} + +func genTopicDir(namespace, topic string) string { + return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic) +} + +func genTopicDirEntry(namespace, topic string) (dir, entry string) { + return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic +} diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go new file mode 100644 index 000000000..3c14f3220 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -0,0 +1,116 @@ +package broker + +import ( + "context" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +/* +Topic discovery: + +When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker. + +The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it. +Otherwise, just host the topic. + +So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy. +If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help. + +*/ + +func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { + + t := &messaging_pb.FindBrokerResponse{} + var peers []string + + targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) + + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ + Resource: targetTopicPartition, + }) + if err != nil { + return err + } + if resp.Found && len(resp.Resources) > 0 { + t.Broker = resp.Resources[0].GrpcAddresses + return nil + } + for _, b := range resp.Resources { + peers = append(peers, b.GrpcAddresses) + } + return nil + }) + if err != nil { + return nil, err + } + } + + t.Broker = PickMember(peers, []byte(targetTopicPartition)) + + return t, nil + +} + +func (broker *MessageBroker) checkFilers() { + + // contact a filer about masters + var masters []string + found := false + for !found { + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received master list: %s", masters) + + // contact each masters for filers + var filers []string + found = false + for !found { + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + filers = append(filers, resp.GrpcAddresses...) + + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to list filers: %v", err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received filer list: %s", filers) + + broker.option.Filers = filers + +} diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go new file mode 100644 index 000000000..6e6b723d1 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -0,0 +1,112 @@ +package broker + +import ( + "crypto/md5" + "fmt" + "io" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { + + // process initial request + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + // TODO look it up + topicConfig := &messaging_pb.TopicConfiguration{ + // IsTransient: true, + } + + // send init response + initResponse := &messaging_pb.PublishResponse{ + Config: nil, + Redirect: nil, + } + err = stream.Send(initResponse) + if err != nil { + return err + } + if initResponse.Redirect != nil { + return nil + } + + // get lock + tp := TopicPartition{ + Namespace: in.Init.Namespace, + Topic: in.Init.Topic, + Partition: in.Init.Partition, + } + + tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic) + md5File := fmt.Sprintf("p%02d.md5", tp.Partition) + // println("chan data stored under", tpDir, "as", md5File) + + if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { + return fmt.Errorf("channel is already closed") + } + + tl := broker.topicManager.RequestLock(tp, topicConfig, true) + defer broker.topicManager.ReleaseLock(tp, true) + + md5hash := md5.New() + // process each message + for { + // println("recv") + in, err := stream.Recv() + // glog.V(0).Infof("recieved %v err: %v", in, err) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + if in.Data == nil { + continue + } + + // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value)) + + data, err := proto.Marshal(in.Data) + if err != nil { + glog.Errorf("marshall error: %v\n", err) + continue + } + + tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) + + if in.Data.IsClose { + // println("server received closing") + break + } + + md5hash.Write(in.Data.Value) + + } + + if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { + glog.V(0).Infof("err writing %s: %v", md5File, err) + } + + // fmt.Printf("received md5 %X\n", md5hash.Sum(nil)) + + // send the close ack + // println("server send ack closing") + if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { + glog.V(0).Infof("err sending close response: %v", err) + } + return nil + +} diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go new file mode 100644 index 000000000..3021473e5 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -0,0 +1,177 @@ +package broker + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" + "io" + "strings" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { + + // process initial request + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + var processedTsNs int64 + var messageCount int64 + subscriberId := in.Init.SubscriberId + + // TODO look it up + topicConfig := &messaging_pb.TopicConfiguration{ + // IsTransient: true, + } + + // get lock + tp := TopicPartition{ + Namespace: in.Init.Namespace, + Topic: in.Init.Topic, + Partition: in.Init.Partition, + } + fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) + defer func() { + fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) + }() + + lock := broker.topicManager.RequestLock(tp, topicConfig, false) + defer broker.topicManager.ReleaseLock(tp, false) + + isConnected := true + go func() { + for isConnected { + if _, err := stream.Recv(); err != nil { + // println("disconnecting connection to", subscriberId, tp.String()) + isConnected = false + lock.cond.Signal() + } + } + }() + + lastReadTime := time.Now() + switch in.Init.StartPosition { + case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: + lastReadTime = time.Unix(0, in.Init.TimestampNs) + case messaging_pb.SubscriberMessage_InitMessage_LATEST: + case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: + lastReadTime = time.Unix(0, 0) + } + + // how to process each message + // an error returned will end the subscription + eachMessageFn := func(m *messaging_pb.Message) error { + err := stream.Send(&messaging_pb.BrokerMessage{ + Data: m, + }) + if err != nil { + glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) + } + return err + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { + m := &messaging_pb.Message{} + if err = proto.Unmarshal(logEntry.Data, m); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + return err + } + // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) + if err = eachMessageFn(m); err != nil { + glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) + return err + } + if m.IsClose { + // println("processed EOF") + return io.EOF + } + processedTsNs = logEntry.TsNs + messageCount++ + return nil + } + + // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) + + for { + + if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + if err != io.EOF { + // println("stopping from persisted logs", err.Error()) + return err + } + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + + lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() + return isConnected + }, eachLogEntryFn) + if err != nil { + if err == log_buffer.ResumeFromDiskError { + continue + } + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } + + return err + +} + +func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { + startTime = startTime.UTC() + startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + + sizeBuf := make([]byte, 4) + startTsNs := startTime.UnixNano() + + topicDir := genTopicDir(tp.Namespace, tp.Topic) + partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) + + return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { + dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) + return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { + if dayEntry.Name == startDate { + if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + return nil + } + } + if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { + return nil + } + // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) + chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) + defer chunkedFileReader.Close() + if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + chunkedFileReader.Close() + if err == io.EOF { + return err + } + return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) + } + return nil + }, "", false, 24*60) + }, startDate, true, 366) + +} diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go new file mode 100644 index 000000000..06162471c --- /dev/null +++ b/weed/messaging/broker/broker_server.go @@ -0,0 +1,114 @@ +package broker + +import ( + "context" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +type MessageBrokerOption struct { + Filers []string + DefaultReplication string + MaxMB int + Ip string + Port int + Cipher bool +} + +type MessageBroker struct { + option *MessageBrokerOption + grpcDialOption grpc.DialOption + topicManager *TopicManager +} + +func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { + + messageBroker = &MessageBroker{ + option: option, + grpcDialOption: grpcDialOption, + } + + messageBroker.topicManager = NewTopicManager(messageBroker) + + messageBroker.checkFilers() + + go messageBroker.keepConnectedToOneFiler() + + return messageBroker, nil +} + +func (broker *MessageBroker) keepConnectedToOneFiler() { + + for { + for _, filer := range broker.option.Filers { + broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.KeepConnected(ctx) + if err != nil { + glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + + initRequest := &filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + } + for _, tp := range broker.topicManager.ListTopicPartitions() { + initRequest.Resources = append(initRequest.Resources, tp.String()) + } + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + + // TODO send events of adding/removing topics + + glog.V(0).Infof("conntected with filer: %v", filer) + for { + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("send heartbeat") + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("received reply") + time.Sleep(11 * time.Second) + // println("woke up") + } + return nil + }) + time.Sleep(3 * time.Second) + } + } + +} + +func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + +} + +func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) + +} diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go new file mode 100644 index 000000000..465a2a8f2 --- /dev/null +++ b/weed/messaging/broker/consistent_distribution.go @@ -0,0 +1,38 @@ +package broker + +import ( + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash" +) + +type Member string + +func (m Member) String() string { + return string(m) +} + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +func PickMember(members []string, key []byte) string { + cfg := consistent.Config{ + PartitionCount: 9791, + ReplicationFactor: 2, + Load: 1.25, + Hasher: hasher{}, + } + + cmembers := []consistent.Member{} + for _, m := range members { + cmembers = append(cmembers, Member(m)) + } + + c := consistent.New(cmembers, cfg) + + m := c.LocateKey(key) + + return m.String() +} diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go new file mode 100644 index 000000000..f58fe4e0e --- /dev/null +++ b/weed/messaging/broker/consistent_distribution_test.go @@ -0,0 +1,32 @@ +package broker + +import ( + "fmt" + "testing" +) + +func TestPickMember(t *testing.T) { + + servers := []string{ + "s1:port", + "s2:port", + "s3:port", + "s5:port", + "s4:port", + } + + total := 1000 + + distribution := make(map[string]int) + for i := 0; i < total; i++ { + tp := fmt.Sprintf("tp:%2d", i) + m := PickMember(servers, []byte(tp)) + // println(tp, "=>", m) + distribution[m]++ + } + + for member, count := range distribution { + fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) + } + +} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go new file mode 100644 index 000000000..edddca813 --- /dev/null +++ b/weed/messaging/broker/topic_manager.go @@ -0,0 +1,124 @@ +package broker + +import ( + "fmt" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +type TopicPartition struct { + Namespace string + Topic string + Partition int32 +} + +const ( + TopicPartitionFmt = "%s/%s_%02d" +) + +func (tp *TopicPartition) String() string { + return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) +} + +type TopicControl struct { + sync.Mutex + cond *sync.Cond + subscriberCount int + publisherCount int + logBuffer *log_buffer.LogBuffer +} + +type TopicManager struct { + sync.Mutex + topicControls map[TopicPartition]*TopicControl + broker *MessageBroker +} + +func NewTopicManager(messageBroker *MessageBroker) *TopicManager { + return &TopicManager{ + topicControls: make(map[TopicPartition]*TopicControl), + broker: messageBroker, + } +} + +func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { + + flushFn := func(startTime, stopTime time.Time, buf []byte) { + + if topicConfig.IsTransient { + // return + } + + // fmt.Printf("flushing with topic config %+v\n", topicConfig) + + startTime, stopTime = startTime.UTC(), stopTime.UTC() + targetFile := fmt.Sprintf( + "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", + filer.TopicsDir, tp.Namespace, tp.Topic, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + tp.Partition, + ) + + if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } + } + logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { + tl.cond.Broadcast() + }) + + return logBuffer +} + +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { + tm.Lock() + defer tm.Unlock() + + tc, found := tm.topicControls[partition] + if !found { + tc = &TopicControl{} + tc.cond = sync.NewCond(&tc.Mutex) + tm.topicControls[partition] = tc + tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) + } + if isPublisher { + tc.publisherCount++ + } else { + tc.subscriberCount++ + } + return tc +} + +func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { + tm.Lock() + defer tm.Unlock() + + lock, found := tm.topicControls[partition] + if !found { + return + } + if isPublisher { + lock.publisherCount-- + } else { + lock.subscriberCount-- + } + if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { + delete(tm.topicControls, partition) + lock.logBuffer.Shutdown() + } +} + +func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { + tm.Lock() + defer tm.Unlock() + + for k := range tm.topicControls { + tps = append(tps, k) + } + return +} diff --git a/weed/messaging/msgclient/chan_config.go b/weed/messaging/msgclient/chan_config.go new file mode 100644 index 000000000..a75678815 --- /dev/null +++ b/weed/messaging/msgclient/chan_config.go @@ -0,0 +1,5 @@ +package msgclient + +func (mc *MessagingClient) DeleteChannel(chanName string) error { + return mc.DeleteTopic("chan", chanName) +} diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/messaging/msgclient/chan_pub.go new file mode 100644 index 000000000..9bc88f7c0 --- /dev/null +++ b/weed/messaging/msgclient/chan_pub.go @@ -0,0 +1,76 @@ +package msgclient + +import ( + "crypto/md5" + "hash" + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn + md5hash hash.Hash +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + md5hash: md5.New(), + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) + if err == nil { + pc.md5hash.Write(m) + } + return err +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go new file mode 100644 index 000000000..213ff4666 --- /dev/null +++ b/weed/messaging/msgclient/chan_sub.go @@ -0,0 +1,85 @@ +package msgclient + +import ( + "context" + "crypto/md5" + "hash" + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient + md5hash hash.Hash + cancel context.CancelFunc +} + +func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + md5hash: md5.New(), + cancel: cancel, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) + close(t.ch) + cancel() + return + } + t.ch <- resp.Data.Value + t.md5hash.Write(resp.Data.Value) + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} + +func (sc *SubChannel) Md5() []byte { + return sc.md5hash.Sum(nil) +} + +func (sc *SubChannel) Cancel() { + sc.cancel() +} diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go new file mode 100644 index 000000000..4d7ef2b8e --- /dev/null +++ b/weed/messaging/msgclient/client.go @@ -0,0 +1,55 @@ +package msgclient + +import ( + "context" + "fmt" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessagingClient struct { + bootstrapBrokers []string + grpcConnections map[broker.TopicPartition]*grpc.ClientConn + grpcDialOption grpc.DialOption +} + +func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { + return &MessagingClient{ + bootstrapBrokers: bootstrapBrokers, + grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), + } +} + +func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { + + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), + &messaging_pb.FindBrokerRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Parition: tp.Partition, + }) + if err != nil { + return nil, err + } + + targetBroker := resp.Broker + return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) + } + return nil, fmt.Errorf("no broker found for %+v", tp) +} diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go new file mode 100644 index 000000000..2b9eba1a8 --- /dev/null +++ b/weed/messaging/msgclient/config.go @@ -0,0 +1,63 @@ +package msgclient + +import ( + "context" + "log" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { + + return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + _, err := client.ConfigureTopic(context.Background(), + &messaging_pb.ConfigureTopicRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Configuration: &messaging_pb.TopicConfiguration{ + PartitionCount: 0, + Collection: "", + Replication: "", + IsTransient: false, + Partitoning: 0, + }, + }) + return err + }) + +} + +func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { + + return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + _, err := client.DeleteTopic(context.Background(), + &messaging_pb.DeleteTopicRequest{ + Namespace: namespace, + Topic: topic, + }) + return err + }) +} + +func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error { + + var lastErr error + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection)) + if err == nil { + return nil + } + lastErr = err + } + + return lastErr +} diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go new file mode 100644 index 000000000..1aa483ff8 --- /dev/null +++ b/weed/messaging/msgclient/publisher.go @@ -0,0 +1,118 @@ +package msgclient + +import ( + "context" + + "github.com/OneOfOne/xxhash" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type Publisher struct { + publishClients []messaging_pb.SeaweedMessaging_PublishClient + topicConfiguration *messaging_pb.TopicConfiguration + messageCount uint64 + publisherId string +} + +func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupPublisherClient(grpcClientConn, tp) + if err != nil { + return nil, err + } + publishClients[i] = client + } + return &Publisher{ + publishClients: publishClients, + topicConfiguration: topicConfiguration, + }, nil +} + +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { + + stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&messaging_pb.PublishRequest{ + Init: &messaging_pb.PublishRequest_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + if initResponse.Config != nil { + } + + // setup looks for control messages + doneChan := make(chan error, 1) + go func() { + for { + in, err := stream.Recv() + if err != nil { + doneChan <- err + return + } + if in.Redirect != nil { + } + if in.Config != nil { + } + } + }() + + return stream, nil + +} + +func (p *Publisher) Publish(m *messaging_pb.Message) error { + hashValue := p.messageCount + p.messageCount++ + if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { + if m.Key != nil { + hashValue = xxhash.Checksum64(m.Key) + } + } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { + hashValue = xxhash.Checksum64(m.Key) + } else { + // round robin + } + + idx := int(hashValue) % len(p.publishClients) + if idx < 0 { + idx += len(p.publishClients) + } + return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ + Data: m, + }) +} diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go new file mode 100644 index 000000000..6c7dc1ab7 --- /dev/null +++ b/weed/messaging/msgclient/subscriber.go @@ -0,0 +1,120 @@ +package msgclient + +import ( + "context" + "io" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "google.golang.org/grpc" +) + +type Subscriber struct { + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberCancels []context.CancelFunc + subscriberId string +} + +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId >= 0 && i != partitionId { + continue + } + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) + if err != nil { + return nil, err + } + subscriberClients[i] = client + subscriberCancels[i] = cancel + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberCancels: subscriberCancels, + subscriberId: subscriberId, + }, nil +} + +func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) + if err != nil { + return + } + + // send init message + err = stream.Send(&messaging_pb.SubscriberMessage{ + Init: &messaging_pb.SubscriberMessage_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + TimestampNs: startTime.UnixNano(), + SubscriberId: subscriberId, + }, + }) + if err != nil { + return + } + + return stream, nil +} + +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { + for { + resp, listenErr := subscriberClient.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + println(listenErr.Error()) + return listenErr + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + processFn(resp.Data) + } +} + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberClients[i] != nil { + wg.Add(1) + go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) { + defer wg.Done() + doSubscribe(subscriberClient, processFn) + }(s.subscriberClients[i]) + } + } + wg.Wait() +} + +func (s *Subscriber) Shutdown() { + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberCancels[i] != nil { + s.subscriberCancels[i]() + } + } +} |
