aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_configure.go35
-rw-r--r--weed/mq/broker/broker_server.go1
-rw-r--r--weed/mq/broker/broker_write.go82
-rw-r--r--weed/mq/topic/local_partition.go11
-rw-r--r--weed/util/log_buffer/log_buffer.go6
5 files changed, 125 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 7f7c8f84b..042621a4c 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -3,12 +3,16 @@ package broker
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "time"
)
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@@ -73,8 +77,9 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
// drain existing topic partition subscriptions
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
+ for _, assignment := range request.BrokerPartitionAssignments {
+ topicPartition := topic.FromPbPartition(assignment.Partition)
+ localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, topicPartition))
if request.IsDraining {
// TODO drain existing topic partition subscriptions
@@ -105,3 +110,29 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
return ret, nil
}
+
+func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition topic.Partition) log_buffer.LogFlushFuncType {
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
+ partitionDir := fmt.Sprintf("%s/%s/%4d-%4d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
+
+ return func(startTime, stopTime time.Time, buf []byte) {
+ if len(buf) == 0 {
+ return
+ }
+
+ startTime, stopTime = startTime.UTC(), stopTime.UTC()
+ fileName := startTime.Format(topic.TIME_FORMAT)
+
+ targetFile := fmt.Sprintf("%s/%s",partitionDir, fileName)
+
+ for {
+ if err := b.appendToFile(targetFile, buf); err != nil {
+ glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ time.Sleep(737 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ }
+}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 34a263032..615964621 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -28,6 +28,7 @@ type MessageQueueBrokerOption struct {
Ip string
Port int
Cipher bool
+ VolumeServerAccess string // how to access volume servers
}
type MessageQueueBroker struct {
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go
new file mode 100644
index 000000000..866cd17c2
--- /dev/null
+++ b/weed/mq/broker/broker_write.go
@@ -0,0 +1,82 @@
+package broker
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "os"
+ "time"
+)
+
+func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
+
+ fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
+ if err2 != nil {
+ return err2
+ }
+
+ // find out existing entry
+ fullpath := util.FullPath(targetFile)
+ dir, name := fullpath.DirAndName()
+ entry, err := filer_pb.GetEntry(b, fullpath)
+ var offset int64 = 0
+ if err == filer_pb.ErrNotFound {
+ entry = &filer_pb.Entry{
+ Name: name,
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ FileMode: uint32(os.FileMode(0644)),
+ Uid: uint32(os.Getuid()),
+ Gid: uint32(os.Getgid()),
+ },
+ }
+ } else if err != nil {
+ return fmt.Errorf("find %s: %v", fullpath, err)
+ } else {
+ offset = int64(filer.TotalSize(entry.GetChunks()))
+ }
+
+ // append to existing chunks
+ entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
+
+ // update the entry
+ return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ })
+}
+
+func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
+
+ reader := util.NewBytesReader(data)
+ fileId, uploadResult, err, _ = operation.UploadWithRetry(
+ b,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: b.option.DefaultReplication,
+ Collection: "topics",
+ // TtlSec: wfs.option.TtlSec,
+ // DiskType: string(wfs.option.DiskType),
+ DataCenter: b.option.DataCenter,
+ Path: targetFile,
+ },
+ &operation.UploadOption{
+ Cipher: b.option.Cipher,
+ },
+ func(host, fileId string) string {
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if b.option.VolumeServerAccess == "filerProxy" {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
+ }
+ return fileUrl
+ },
+ reader,
+ )
+ return
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 36712bbbd..5cf315ddb 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -21,7 +21,8 @@ type LocalPartition struct {
Subscribers *LocalPartitionSubscribers
}
-func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
+var TIME_FORMAT = "2006-01-02-15-04-05"
+func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType) *LocalPartition {
return &LocalPartition{
Partition: partition,
isLeader: isLeader,
@@ -29,9 +30,7 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.
logBuffer: log_buffer.NewLogBuffer(
fmt.Sprintf("%d/%4d-%4d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
2*time.Minute,
- func(startTime, stopTime time.Time, buf []byte) {
-
- },
+ logFlushFn,
func() {
},
@@ -63,13 +62,13 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message
return p.logBuffer.GetEarliestPosition()
}
-func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
+func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType) *LocalPartition {
isLeader := assignment.LeaderBroker == string(self)
followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
for i, followerBroker := range assignment.FollowerBrokers {
followers[i] = pb.ServerAddress(followerBroker)
}
- return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers)
+ return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers, logFlushFn)
}
func (p *LocalPartition) closePublishers() {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 2cb6f8c41..567d660ef 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -22,6 +22,8 @@ type dataToFlush struct {
data *bytes.Buffer
}
+type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte)
+
type LogBuffer struct {
name string
prevBuffers *SealedBuffers
@@ -34,7 +36,7 @@ type LogBuffer struct {
lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
- flushFn func(startTime, stopTime time.Time, buf []byte)
+ flushFn LogFlushFuncType
notifyFn func()
isStopping *atomic.Bool
flushChan chan *dataToFlush
@@ -42,7 +44,7 @@ type LogBuffer struct {
sync.RWMutex
}
-func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
+func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
name: name,
prevBuffers: newSealedBuffers(PreviousBufferCount),