aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker_append.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-17 02:29:38 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-17 02:29:38 -0700
commit3f3dba5a68287da6a5a8e87945c1eca2bcf925f6 (patch)
tree69d5fbc5070e301f59b325d66c1fbd42483be661 /weed/messaging/broker_append.go
parent2a458972373856cc1b31b0b7433b2aaa000bb700 (diff)
downloadseaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.tar.xz
seaweedfs-3f3dba5a68287da6a5a8e87945c1eca2bcf925f6.zip
broker: append message logs
Diffstat (limited to 'weed/messaging/broker_append.go')
-rw-r--r--weed/messaging/broker_append.go116
1 files changed, 116 insertions, 0 deletions
diff --git a/weed/messaging/broker_append.go b/weed/messaging/broker_append.go
new file mode 100644
index 000000000..07b3755c0
--- /dev/null
+++ b/weed/messaging/broker_append.go
@@ -0,0 +1,116 @@
+package messaging
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
+
+ assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
+ if err2 != nil {
+ return err2
+ }
+
+ dir, name := util.FullPath(targetFile).DirAndName()
+
+ chunk := &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: 0, // needs to be fixed during appending
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ IsGzipped: uploadResult.Gzip > 0,
+ }
+
+ // append the chunk
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AppendToEntryRequest{
+ Directory: dir,
+ EntryName: name,
+ Chunks: []*filer_pb.FileChunk{chunk},
+ }
+
+ _, err := client.AppendToEntry(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("append to file %v: %v", request, err)
+ return err
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("append to file %v: %v", targetFile, err)
+ }
+
+ return nil
+}
+
+func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+
+ var assignResult = &operation.AssignResult{}
+ var collection, replication string
+
+ // assign a volume location
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: topicConfig.Replication,
+ Collection: topicConfig.Collection,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ assignResult.Auth = security.EncodedJwt(resp.Auth)
+ assignResult.Fid = resp.FileId
+ assignResult.Url = resp.Url
+ assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Count = uint64(resp.Count)
+
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // upload data
+ targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+ uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
+
+func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ for _, filer := range broker.option.Filers {
+ if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ glog.V(0).Infof("fail to connect to %s: %v", filer, err)
+ } else {
+ break
+ }
+ }
+
+ return
+
+}