aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-30 03:05:34 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-30 03:05:34 -0700
commitf9b6178b8f12bd1b34a3756b15d6cee69930b26c (patch)
tree2faaf74e906fc45766d4d6422a9cf583461cd664 /weed/messaging/broker
parent8c73410a51441d7f9f1140a8996dd3eb1f191f2e (diff)
downloadseaweedfs-f9b6178b8f12bd1b34a3756b15d6cee69930b26c.tar.xz
seaweedfs-f9b6178b8f12bd1b34a3756b15d6cee69930b26c.zip
log messages
Diffstat (limited to 'weed/messaging/broker')
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go2
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go10
-rw-r--r--weed/messaging/broker/topic_lock.go2
3 files changed, 9 insertions, 5 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 985f708b5..b3a909a6c 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -22,7 +22,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
- IsTransient: true,
+ // IsTransient: true,
}
// send init response
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index c5e033420..379063eed 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -34,7 +34,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
- IsTransient: true,
+ // IsTransient: true,
}
if err = stream.Send(&messaging_pb.BrokerMessage{
@@ -79,7 +79,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
return err
}
- // fmt.Printf("sending : %d bytes\n", len(m.Value))
+ // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
if err = eachMessageFn(m); err != nil {
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
@@ -115,6 +115,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
startTsNs := startTime.UnixNano()
topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
+ partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
@@ -124,7 +125,10 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
return nil
}
}
- // println("processing", hourMinuteEntry.FullPath)
+ if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){
+ return nil
+ }
+ // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
defer chunkedFileReader.Close()
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
index d179537c9..f8a5aa171 100644
--- a/weed/messaging/broker/topic_lock.go
+++ b/weed/messaging/broker/topic_lock.go
@@ -42,7 +42,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
flushFn := func(startTime, stopTime time.Time, buf []byte) {
if topicConfig.IsTransient {
- return
+ // return
}
// fmt.Printf("flushing with topic config %+v\n", topicConfig)