diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-17 02:29:38 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-17 02:29:38 -0700 |
| commit | 3f3dba5a68287da6a5a8e87945c1eca2bcf925f6 (patch) | |
| tree | 69d5fbc5070e301f59b325d66c1fbd42483be661 /weed/messaging/broker_append.go | |
| parent | 2a458972373856cc1b31b0b7433b2aaa000bb700 (diff) | |
| download | seaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.tar.xz seaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.zip | |
broker: append message logs
Diffstat (limited to 'weed/messaging/broker_append.go')
| -rw-r--r-- | weed/messaging/broker_append.go | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/weed/messaging/broker_append.go b/weed/messaging/broker_append.go new file mode 100644 index 000000000..07b3755c0 --- /dev/null +++ b/weed/messaging/broker_append.go @@ -0,0 +1,116 @@ +package messaging + +import ( + "context" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { + + assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) + if err2 != nil { + return err2 + } + + dir, name := util.FullPath(targetFile).DirAndName() + + chunk := &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: 0, // needs to be fixed during appending + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + IsGzipped: uploadResult.Gzip > 0, + } + + // append the chunk + if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AppendToEntryRequest{ + Directory: dir, + EntryName: name, + Chunks: []*filer_pb.FileChunk{chunk}, + } + + _, err := client.AppendToEntry(context.Background(), request) + if err != nil { + glog.V(0).Infof("append to file %v: %v", request, err) + return err + } + + return nil + }); err != nil { + return fmt.Errorf("append to file %v: %v", targetFile, err) + } + + return nil +} + +func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + + var assignResult = &operation.AssignResult{} + var collection, replication string + + // assign a volume location + if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: topicConfig.Replication, + Collection: topicConfig.Collection, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + assignResult.Auth = security.EncodedJwt(resp.Auth) + assignResult.Fid = resp.FileId + assignResult.Url = resp.Url + assignResult.PublicUrl = resp.PublicUrl + assignResult.Count = uint64(resp.Count) + + collection, replication = resp.Collection, resp.Replication + + return nil + }); err != nil { + return nil, nil, err + } + + // upload data + targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} + +func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { + + for _, filer := range broker.option.Filers { + if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + glog.V(0).Infof("fail to connect to %s: %v", filer, err) + } else { + break + } + } + + return + +} |
