aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-10 03:48:35 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-10 03:48:35 -0700
commit6bf3eb69cb9abd02e1d63ecdee485d198a3cab9a (patch)
treef1cf35c6d8322db60b36793cf8bf1f0d0fdc537c
parent78afb8bf4699f85083160f8bfee2f04493985cd1 (diff)
downloadseaweedfs-6bf3eb69cb9abd02e1d63ecdee485d198a3cab9a.tar.xz
seaweedfs-6bf3eb69cb9abd02e1d63ecdee485d198a3cab9a.zip
async chan write read, no write for closed chan
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go22
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go1
-rw-r--r--weed/messaging/broker/topic_lock.go3
-rw-r--r--weed/messaging/msgclient/sub_chan.go2
-rw-r--r--weed/util/log_buffer/log_buffer.go11
5 files changed, 34 insertions, 5 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 61e53b433..573706c06 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -1,11 +1,15 @@
package broker
import (
+ "crypto/md5"
+ "fmt"
"io"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -44,9 +48,19 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
+
+ tpDir := fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, tp.Namespace, tp.Topic)
+ md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
+ // println("chan data stored under", tpDir, "as", md5File)
+
+ if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
+ return fmt.Errorf("channel is already closed")
+ }
+
tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
defer broker.topicLocks.ReleaseLock(tp, true)
+ md5hash := md5.New()
// process each message
for {
// println("recv")
@@ -78,8 +92,16 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
break
}
+ md5hash.Write(in.Data.Value)
+
+ }
+
+ if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
+ glog.V(0).Infof("err writing %s: %v", md5File, err)
}
+ // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
+
// send the close ack
// println("server send ack closing")
if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index f8fd16a14..86ee6923d 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -57,6 +57,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, in.Init.TimestampNs)
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ lastReadTime = time.Unix(0, 0)
}
var processedTsNs int64
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
index f3a66a2f5..4c4803275 100644
--- a/weed/messaging/broker/topic_lock.go
+++ b/weed/messaging/broker/topic_lock.go
@@ -17,7 +17,7 @@ type TopicPartition struct {
Partition int32
}
const (
- TopicPartitionFmt = "%s/%s_%2d"
+ TopicPartitionFmt = "%s/%s_%02d"
)
func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
@@ -106,6 +106,7 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
}
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
delete(tl.locks, partition)
+ lock.logBuffer.Shutdown()
}
}
diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go
index aae5c0c71..ed25a850c 100644
--- a/weed/messaging/msgclient/sub_chan.go
+++ b/weed/messaging/msgclient/sub_chan.go
@@ -55,8 +55,8 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
close(t.ch)
return
}
- t.md5hash.Write(resp.Data.Value)
t.ch <- resp.Data.Value
+ t.md5hash.Write(resp.Data.Value)
}
}()
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 6ba7f3737..67c44dc57 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -98,13 +98,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}
func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
if m.isStopping {
return
}
m.isStopping = true
- m.Lock()
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
}
@@ -123,10 +124,14 @@ func (m *LogBuffer) loopInterval() {
for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
// println("loop interval")
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
+ m.Unlock()
}
}