aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/scaffold.go2
-rw-r--r--weed/replication/notification_kafka.go79
2 files changed, 79 insertions, 2 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 1f76cbfea..0df82853c 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -166,6 +166,8 @@ hosts = [
"localhost:9092"
]
topic = "seaweedfs_filer1_to_filer2"
+offsetFile = "./last.offset"
+offsetSaveIntervalSeconds = 10
[sink.filer]
enabled = true
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go
index d10175757..87f28f738 100644
--- a/weed/replication/notification_kafka.go
+++ b/weed/replication/notification_kafka.go
@@ -8,6 +8,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
+ "io/ioutil"
+ "encoding/json"
+ "sync"
+ "time"
)
func init() {
@@ -30,10 +34,12 @@ func (k *KafkaInput) Initialize(configuration util.Configuration) error {
return k.initialize(
configuration.GetStringSlice("hosts"),
configuration.GetString("topic"),
+ configuration.GetString("offsetFile"),
+ configuration.GetInt("offsetSaveIntervalSeconds"),
)
}
-func (k *KafkaInput) initialize(hosts []string, topic string) (err error) {
+func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
k.consumer, err = sarama.NewConsumer(hosts, config)
@@ -51,8 +57,25 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) {
panic(err)
}
+ progress := loadProgress(offsetFile)
+ if progress == nil || progress.Topic != topic {
+ progress = &KafkaProgress{
+ Topic: topic,
+ PartitionOffsets: make(map[int32]int64),
+ }
+ }
+ progress.lastSaveTime = time.Now()
+ progress.offsetFile = offsetFile
+ progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds
+
for _, partition := range partitions {
- partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
+ offset, found := progress.PartitionOffsets[partition]
+ if !found {
+ offset = sarama.OffsetOldest
+ } else {
+ offset += 1
+ }
+ partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset)
if err != nil {
panic(err)
}
@@ -63,6 +86,9 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) {
fmt.Println(err)
case msg := <-partitionConsumer.Messages():
k.messageChan <- msg
+ if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
+ glog.Warningf("set kafka offset: %v", err)
+ }
}
}
}()
@@ -81,3 +107,52 @@ func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotifi
return
}
+
+type KafkaProgress struct {
+ Topic string `json:"topic"`
+ PartitionOffsets map[int32]int64 `json:"partitionOffsets"`
+ offsetFile string
+ lastSaveTime time.Time
+ offsetSaveIntervalSeconds int
+ sync.Mutex
+}
+
+func loadProgress(offsetFile string) *KafkaProgress {
+ progress := &KafkaProgress{}
+ data, err := ioutil.ReadFile(offsetFile)
+ if err != nil {
+ glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ return nil
+ }
+ err = json.Unmarshal(data, progress)
+ if err != nil {
+ glog.Warningf("failed to read kafka progress message: %s", string(data))
+ return nil
+ }
+ return progress
+}
+
+func (progress *KafkaProgress) saveProgress() error {
+ data, err := json.Marshal(progress)
+ if err != nil {
+ return fmt.Errorf("failed to marshal progress: %v", err)
+ }
+ err = ioutil.WriteFile(progress.offsetFile, data, 0640)
+ if err != nil {
+ return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err)
+ }
+
+ progress.lastSaveTime = time.Now()
+ return nil
+}
+
+func (progress *KafkaProgress) setOffset(parition int32, offset int64) error {
+ progress.Lock()
+ defer progress.Unlock()
+
+ progress.PartitionOffsets[parition] = offset
+ if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds {
+ return progress.saveProgress()
+ }
+ return nil
+}