aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-17 02:29:38 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-17 02:29:38 -0700
commit3f3dba5a68287da6a5a8e87945c1eca2bcf925f6 (patch)
tree69d5fbc5070e301f59b325d66c1fbd42483be661 /weed/messaging
parent2a458972373856cc1b31b0b7433b2aaa000bb700 (diff)
downloadseaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.tar.xz
seaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.zip
broker: append message logs
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker_append.go116
-rw-r--r--weed/messaging/msg_broker_grpc_server.go17
2 files changed, 126 insertions, 7 deletions
diff --git a/weed/messaging/broker_append.go b/weed/messaging/broker_append.go
new file mode 100644
index 000000000..07b3755c0
--- /dev/null
+++ b/weed/messaging/broker_append.go
@@ -0,0 +1,116 @@
+package messaging
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
+
+ assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
+ if err2 != nil {
+ return err2
+ }
+
+ dir, name := util.FullPath(targetFile).DirAndName()
+
+ chunk := &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: 0, // needs to be fixed during appending
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ IsGzipped: uploadResult.Gzip > 0,
+ }
+
+ // append the chunk
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AppendToEntryRequest{
+ Directory: dir,
+ EntryName: name,
+ Chunks: []*filer_pb.FileChunk{chunk},
+ }
+
+ _, err := client.AppendToEntry(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("append to file %v: %v", request, err)
+ return err
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("append to file %v: %v", targetFile, err)
+ }
+
+ return nil
+}
+
+func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+
+ var assignResult = &operation.AssignResult{}
+ var collection, replication string
+
+ // assign a volume location
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: topicConfig.Replication,
+ Collection: topicConfig.Collection,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ assignResult.Auth = security.EncodedJwt(resp.Auth)
+ assignResult.Fid = resp.FileId
+ assignResult.Url = resp.Url
+ assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Count = uint64(resp.Count)
+
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // upload data
+ targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+ uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
+
+func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ for _, filer := range broker.option.Filers {
+ if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ glog.V(0).Infof("fail to connect to %s: %v", filer, err)
+ } else {
+ break
+ }
+ }
+
+ return
+
+}
diff --git a/weed/messaging/msg_broker_grpc_server.go b/weed/messaging/msg_broker_grpc_server.go
index 5b93b8f62..f4b8321e2 100644
--- a/weed/messaging/msg_broker_grpc_server.go
+++ b/weed/messaging/msg_broker_grpc_server.go
@@ -32,6 +32,11 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
updatesChan := make(chan int32)
+ // TODO look it up
+ topicConfig := &messaging_pb.TopicConfiguration{
+
+ }
+
go func() {
for update := range updatesChan {
if err := stream.Send(&messaging_pb.PublishResponse{
@@ -47,18 +52,16 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
- //targetFile :=
- fmt.Sprintf("%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
filer2.TopicsDir, namespace, topic,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
partition,
)
- /*
- if err := f.appendToFile(targetFile, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- */
+ if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
}, func() {
// notify subscribers