aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_append.go
blob: 599efad9818058c5ec3ba6d88406a3522a122b20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package broker

import (
	"context"
	"fmt"
	"github.com/chrislusf/seaweedfs/weed/security"
	"io"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/operation"
	"github.com/chrislusf/seaweedfs/weed/pb"
	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
	"github.com/chrislusf/seaweedfs/weed/util"
)

func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {

	assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
	if err2 != nil {
		return err2
	}

	dir, name := util.FullPath(targetFile).DirAndName()

	// append the chunk
	if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

		request := &filer_pb.AppendToEntryRequest{
			Directory: dir,
			EntryName: name,
			Chunks:    []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
		}

		_, err := client.AppendToEntry(context.Background(), request)
		if err != nil {
			glog.V(0).Infof("append to file %v: %v", request, err)
			return err
		}

		return nil
	}); err != nil {
		return fmt.Errorf("append to file %v: %v", targetFile, err)
	}

	return nil
}

func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {

	var assignResult = &operation.AssignResult{}

	// assign a volume location
	if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

		assignErr := util.Retry("assignVolume", func() error {
			request := &filer_pb.AssignVolumeRequest{
				Count:       1,
				Replication: topicConfig.Replication,
				Collection:  topicConfig.Collection,
			}

			resp, err := client.AssignVolume(context.Background(), request)
			if err != nil {
				glog.V(0).Infof("assign volume failure %v: %v", request, err)
				return err
			}
			if resp.Error != "" {
				return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
			}

			assignResult.Auth = security.EncodedJwt(resp.Auth)
			assignResult.Fid = resp.FileId
			assignResult.Url = resp.Url
			assignResult.PublicUrl = resp.PublicUrl
			assignResult.Count = uint64(resp.Count)

			return nil
		})
		if assignErr != nil {
			return assignErr
		}

		return nil
	}); err != nil {
		return nil, nil, err
	}

	// upload data
	targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
	uploadOption := &operation.UploadOption{
		UploadUrl:         targetUrl,
		Filename:          "",
		Cipher:            broker.option.Cipher,
		IsInputCompressed: false,
		MimeType:          "",
		PairMap:           nil,
		Jwt:               assignResult.Auth,
	}
	uploadResult, err := operation.UploadData(data, uploadOption)
	if err != nil {
		return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
	}
	// println("uploaded to", targetUrl)
	return assignResult, uploadResult, nil
}

var _ = filer_pb.FilerClient(&MessageBroker{})

func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {

	for _, filer := range broker.option.Filers {
		if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
			if err == io.EOF {
				return
			}
			glog.V(0).Infof("fail to connect to %s: %v", filer, err)
		} else {
			break
		}
	}

	return

}

func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
	return location.Url
}