diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-30 03:05:34 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-30 03:05:34 -0700 |
| commit | f9b6178b8f12bd1b34a3756b15d6cee69930b26c (patch) | |
| tree | 2faaf74e906fc45766d4d6422a9cf583461cd664 /weed/messaging | |
| parent | 8c73410a51441d7f9f1140a8996dd3eb1f191f2e (diff) | |
| download | seaweedfs-f9b6178b8f12bd1b34a3756b15d6cee69930b26c.tar.xz seaweedfs-f9b6178b8f12bd1b34a3756b15d6cee69930b26c.zip | |
log messages
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 2 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 10 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_lock.go | 2 |
3 files changed, 9 insertions, 5 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 985f708b5..b3a909a6c 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -22,7 +22,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ - IsTransient: true, + // IsTransient: true, } // send init response diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index c5e033420..379063eed 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -34,7 +34,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ - IsTransient: true, + // IsTransient: true, } if err = stream.Send(&messaging_pb.BrokerMessage{ @@ -79,7 +79,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) return err } - // fmt.Printf("sending : %d bytes\n", len(m.Value)) + // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) if err = eachMessageFn(m); err != nil { glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err @@ -115,6 +115,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim startTsNs := startTime.UnixNano() topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) @@ -124,7 +125,10 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim return nil } } - // println("processing", hourMinuteEntry.FullPath) + if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){ + return nil + } + // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) defer chunkedFileReader.Close() if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go index d179537c9..f8a5aa171 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_lock.go @@ -42,7 +42,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC flushFn := func(startTime, stopTime time.Time, buf []byte) { if topicConfig.IsTransient { - return + // return } // fmt.Printf("flushing with topic config %+v\n", topicConfig) |
