aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker')
-rw-r--r--weed/messaging/broker/broker_append.go8
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go10
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go10
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go10
-rw-r--r--weed/messaging/broker/broker_server.go12
-rw-r--r--weed/messaging/broker/topic_manager.go4
6 files changed, 27 insertions, 27 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 8e5b56fd0..505560ce0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -5,7 +5,7 @@ import (
"fmt"
"io"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -34,7 +34,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
_, err := client.AppendToEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("append to file %v: %v", request, err)
+ log.Infof("append to file %v: %v", request, err)
return err
}
@@ -61,7 +61,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ log.Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
@@ -98,7 +98,7 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
if err == io.EOF {
return
}
- glog.V(0).Infof("fail to connect to %s: %v", filer, err)
+ log.Infof("fail to connect to %s: %v", filer, err)
} else {
break
}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
index 3c14f3220..594ce057f 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -5,7 +5,7 @@ import (
"fmt"
"time"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
@@ -78,11 +78,11 @@ func (broker *MessageBroker) checkFilers() {
found = true
break
}
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ log.Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
time.Sleep(time.Second)
}
}
- glog.V(0).Infof("received master list: %s", masters)
+ log.Infof("received master list: %s", masters)
// contact each masters for filers
var filers []string
@@ -105,11 +105,11 @@ func (broker *MessageBroker) checkFilers() {
found = true
break
}
- glog.V(0).Infof("failed to list filers: %v", err)
+ log.Infof("failed to list filers: %v", err)
time.Sleep(time.Second)
}
}
- glog.V(0).Infof("received filer list: %s", filers)
+ log.Infof("received filer list: %s", filers)
broker.option.Filers = filers
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 6e6b723d1..515c70b96 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -8,7 +8,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -65,7 +65,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
for {
// println("recv")
in, err := stream.Recv()
- // glog.V(0).Infof("recieved %v err: %v", in, err)
+ // log.Infof("recieved %v err: %v", in, err)
if err == io.EOF {
return nil
}
@@ -81,7 +81,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
data, err := proto.Marshal(in.Data)
if err != nil {
- glog.Errorf("marshall error: %v\n", err)
+ log.Errorf("marshall error: %v\n", err)
continue
}
@@ -97,7 +97,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
}
if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
- glog.V(0).Infof("err writing %s: %v", md5File, err)
+ log.Infof("err writing %s: %v", md5File, err)
}
// fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
@@ -105,7 +105,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
// send the close ack
// println("server send ack closing")
if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
- glog.V(0).Infof("err sending close response: %v", err)
+ log.Infof("err sending close response: %v", err)
}
return nil
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index df4052096..2bb41dcef 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -10,7 +10,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -76,7 +76,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
Data: m,
})
if err != nil {
- glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
+ log.Infof("=> subscriber %v: %+v", subscriberId, err)
}
return err
}
@@ -84,12 +84,12 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
m := &messaging_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ log.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
return err
}
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
if err = eachMessageFn(m); err != nil {
- glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
+ log.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
if m.IsClose {
@@ -122,7 +122,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return isConnected
}, eachLogEntryFn)
if err != nil {
- glog.Errorf("processed to %v: %v", lastReadTime, err)
+ log.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
break
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 06162471c..8d540755f 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -6,7 +6,7 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -52,7 +52,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
defer cancel()
stream, err := client.KeepConnected(ctx)
if err != nil {
- glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ log.Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
@@ -67,24 +67,24 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}); err != nil {
- glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ log.Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
// TODO send events of adding/removing topics
- glog.V(0).Infof("conntected with filer: %v", filer)
+ log.Infof("conntected with filer: %v", filer)
for {
if err := stream.Send(&filer_pb.KeepConnectedRequest{
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}); err != nil {
- glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ log.Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
// println("send heartbeat")
if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ log.Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
// println("received reply")
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
index edddca813..b7705281e 100644
--- a/weed/messaging/broker/topic_manager.go
+++ b/weed/messaging/broker/topic_manager.go
@@ -6,7 +6,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
)
@@ -65,7 +65,7 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi
)
if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ log.Infof("log write failed %s: %v", targetFile, err)
}
}
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {