diff options
Diffstat (limited to 'weed/messaging/broker')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 2 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 10 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_lock.go | 2 |
3 files changed, 9 insertions, 5 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 985f708b5..b3a909a6c 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -22,7 +22,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ - IsTransient: true, + // IsTransient: true, } // send init response diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index c5e033420..379063eed 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -34,7 +34,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ - IsTransient: true, + // IsTransient: true, } if err = stream.Send(&messaging_pb.BrokerMessage{ @@ -79,7 +79,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) return err } - // fmt.Printf("sending : %d bytes\n", len(m.Value)) + // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) if err = eachMessageFn(m); err != nil { glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err @@ -115,6 +115,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim startTsNs := startTime.UnixNano() topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) @@ -124,7 +125,10 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim return nil } } - // println("processing", hourMinuteEntry.FullPath) + if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){ + return nil + } + // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) defer chunkedFileReader.Close() if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go index d179537c9..f8a5aa171 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_lock.go @@ -42,7 +42,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC flushFn := func(startTime, stopTime time.Time, buf []byte) { if topicConfig.IsTransient { - return + // return } // fmt.Printf("flushing with topic config %+v\n", topicConfig) |
