aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_grpc_server.go')
-rw-r--r--weed/server/master_grpc_server.go54
1 files changed, 27 insertions, 27 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index dcf279e1d..30a4dfd88 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -19,7 +19,7 @@ import (
"github.com/seaweedfs/raft"
"google.golang.org/grpc/peer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/topology"
@@ -39,7 +39,7 @@ func (ms *MasterServer) RegisterUuids(heartbeat *master_pb.Heartbeat) (duplicate
index := sort.SearchStrings(v, id)
if index < len(v) && v[index] == id {
duplicated_uuids = append(duplicated_uuids, id)
- glog.Errorf("directory of %s on %s has been loaded", id, k)
+ log.Errorf("directory of %s on %s has been loaded", id, k)
}
}
}
@@ -48,7 +48,7 @@ func (ms *MasterServer) RegisterUuids(heartbeat *master_pb.Heartbeat) (duplicate
}
ms.Topo.UuidMap[key] = heartbeat.LocationUuids
- glog.V(0).Infof("found new uuid:%v %v , %v", key, heartbeat.LocationUuids, ms.Topo.UuidMap)
+ log.V(3).Infof("found new uuid:%v %v , %v", key, heartbeat.LocationUuids, ms.Topo.UuidMap)
return nil, nil
}
@@ -57,7 +57,7 @@ func (ms *MasterServer) UnRegisterUuids(ip string, port int) {
defer ms.Topo.UuidAccessLock.Unlock()
key := fmt.Sprintf("%s:%d", ip, port)
delete(ms.Topo.UuidMap, key)
- glog.V(0).Infof("remove volume server %v, online volume server: %v", key, ms.Topo.UuidMap)
+ log.V(3).Infof("remove volume server %v, online volume server: %v", key, ms.Topo.UuidMap)
}
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -67,7 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if dn != nil {
dn.Counter--
if dn.Counter > 0 {
- glog.V(0).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter)
+ log.V(3).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter)
return
}
@@ -87,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// if the volume server disconnects and reconnects quickly
// the unregister and register can race with each other
ms.Topo.UnRegisterDataNode(dn)
- glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
+ log.V(3).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
ms.UnRegisterUuids(dn.Ip, dn.Port)
if ms.Topo.IsLeader() && (len(message.DeletedVids) > 0 || len(message.DeletedEcVids) > 0) {
@@ -100,9 +100,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
heartbeat, err := stream.Recv()
if err != nil {
if dn != nil {
- glog.Warningf("SendHeartbeat.Recv server %s:%d : %v", dn.Ip, dn.Port, err)
+ log.Warningf("SendHeartbeat.Recv server %s:%d : %v", dn.Ip, dn.Port, err)
} else {
- glog.Warningf("SendHeartbeat.Recv: %v", err)
+ log.Warningf("SendHeartbeat.Recv: %v", err)
}
stats.MasterReceivedHeartbeatCounter.WithLabelValues("error").Inc()
return err
@@ -112,16 +112,16 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// tell the volume servers about the leader
newLeader, err := ms.Topo.Leader()
if err != nil {
- glog.Warningf("SendHeartbeat find leader: %v", err)
+ log.Warningf("SendHeartbeat find leader: %v", err)
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
Leader: string(newLeader),
}); err != nil {
if dn != nil {
- glog.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err)
+ log.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err)
} else {
- glog.Warningf("SendHeartbeat.Send response %v", err)
+ log.Warningf("SendHeartbeat.Send response %v", err)
}
return err
}
@@ -138,13 +138,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
+ log.V(3).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
uuidlist, err := ms.RegisterUuids(heartbeat)
if err != nil {
if stream_err := stream.Send(&master_pb.HeartbeatResponse{
DuplicatedUuids: uuidlist,
}); stream_err != nil {
- glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
+ log.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
return stream_err
}
return err
@@ -154,7 +154,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
Preallocate: ms.preallocateSize > 0,
}); err != nil {
- glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
+ log.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
@@ -163,7 +163,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
- glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ log.V(-1).Infof("master received heartbeat %s", heartbeat.String())
stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
message := &master_pb.VolumeLocation{
@@ -201,11 +201,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
- glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
+ log.V(3).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
message.NewVids = append(message.NewVids, uint32(v.Id))
}
for _, v := range deletedVolumes {
- glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
+ log.V(3).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
}
@@ -229,7 +229,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
- glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ log.V(-1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
@@ -299,7 +299,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
for {
_, err := stream.Recv()
if err != nil {
- glog.V(2).Infof("- client %v: %v", clientName, err)
+ log.V(1).Infof("- client %v: %v", clientName, err)
go func() {
// consume message chan to avoid deadlock, go routine exit when message chan is closed
for range messageChan {
@@ -318,7 +318,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
select {
case message := <-messageChan:
if err := stream.Send(message); err != nil {
- glog.V(0).Infof("=> client %v: %+v", clientName, message)
+ log.V(3).Infof("=> client %v: %+v", clientName, message)
return err
}
case <-ticker.C:
@@ -342,10 +342,10 @@ func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedRespo
for client, ch := range ms.clientChans {
select {
case ch <- message:
- glog.V(4).Infof("send message to %s", client)
+ log.V(-1).Infof("send message to %s", client)
default:
stats.MasterBroadcastToFullErrorCounter.Inc()
- glog.Errorf("broadcastToClients %s message full", client)
+ log.Errorf("broadcastToClients %s message full", client)
}
}
ms.clientChansLock.RUnlock()
@@ -354,7 +354,7 @@ func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedRespo
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
leader, err := ms.Topo.Leader()
if err != nil {
- glog.Errorf("topo leader: %v", err)
+ log.Errorf("topo leader: %v", err)
return raft.NotLeaderError
}
if err := stream.Send(&master_pb.KeepConnectedResponse{
@@ -369,7 +369,7 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
clientName = filerGroup + "." + clientType + "@" + string(clientAddress)
- glog.V(0).Infof("+ client %v", clientName)
+ log.V(3).Infof("+ client %v", clientName)
// we buffer this because otherwise we end up in a potential deadlock where
// the KeepConnected loop is no longer listening on this channel but we're
@@ -384,7 +384,7 @@ func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress p
}
func (ms *MasterServer) deleteClient(clientName string) {
- glog.V(0).Infof("- client %v", clientName)
+ log.V(3).Infof("- client %v", clientName)
ms.clientChansLock.Lock()
// close message chan, so that the KeepConnected go routine can exit
if clientChan, ok := ms.clientChans[clientName]; ok {
@@ -398,11 +398,11 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string {
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
- glog.Error("failed to get peer from ctx")
+ log.Error("failed to get peer from ctx")
return ""
}
if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
+ log.Error("failed to get peer address")
return ""
}
if grpcPort == 0 {