aboutsummaryrefslogtreecommitdiff
path: root/weed/pb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb')
-rw-r--r--weed/pb/filer_pb/filer_client.go14
-rw-r--r--weed/pb/grpc_client_server.go65
-rw-r--r--weed/pb/volume_info.go29
3 files changed, 58 insertions, 50 deletions
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 079fbd671..e4d8bee34 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
+ "math"
"os"
"strings"
"time"
@@ -101,12 +102,16 @@ func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string,
}
func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
-
+ // Redundancy limit to make it correctly judge whether it is the last file.
+ redLimit := limit
+ if limit != math.MaxInt32 && limit != 0{
+ redLimit = limit + 1
+ }
request := &ListEntriesRequest{
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: startFrom,
- Limit: limit,
+ Limit: redLimit,
InclusiveStartFrom: inclusive,
}
@@ -119,6 +124,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
var prevEntry *Entry
+ count := 0
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@@ -139,6 +145,10 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
}
prevEntry = resp.Entry
+ count++
+ if count > int(limit) && limit != 0 {
+ prevEntry = nil
+ }
}
return nil
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 910114313..9efcd9bdc 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -3,6 +3,7 @@ package pb
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"net/http"
"strconv"
"strings"
@@ -108,51 +109,55 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts
}
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
- colonIndex := strings.LastIndex(server, ":")
- if colonIndex < 0 {
- return "", fmt.Errorf("server should have hostname:port format: %v", server)
- }
+ return ParseServerAddress(server, 10000)
+}
- port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
+func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
+
+ host, port, parseErr := hostAndPort(server)
if parseErr != nil {
return "", fmt.Errorf("server port parse error: %v", parseErr)
}
- grpcPort := int(port) + 10000
+ newPort := int(port) + deltaPort
- return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
+ return fmt.Sprintf("%s:%d", host, newPort), nil
}
-func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
- hostnameAndPort := strings.Split(server, ":")
- if len(hostnameAndPort) != 2 {
- return fmt.Sprintf("unexpected server address: %s", server)
+func hostAndPort(address string) (host string, port uint64, err error) {
+ colonIndex := strings.LastIndex(address, ":")
+ if colonIndex < 0 {
+ return "", 0, fmt.Errorf("server should have hostname:port format: %v", address)
+ }
+ port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64)
+ if err != nil {
+ return "", 0, fmt.Errorf("server port parse error: %v", err)
}
- port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ return address[:colonIndex], port, err
+}
+
+func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
+
+ host, port, parseErr := hostAndPort(server)
if parseErr != nil {
- return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
+ glog.Fatalf("server address %s parse error: %v", server, parseErr)
}
grpcPort := int(port) + 10000
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
+ return fmt.Sprintf("%s:%d", host, grpcPort)
}
func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
- hostnameAndPort := strings.Split(grpcAddress, ":")
- if len(hostnameAndPort) != 2 {
- return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress)
- }
-
- grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ host, grpcPort, parseErr := hostAndPort(grpcAddress)
if parseErr != nil {
- return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
+ glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr)
}
port := int(grpcPort) - 10000
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], port)
+ return fmt.Sprintf("%s:%d", host, port)
}
func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
@@ -197,19 +202,3 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
-
-func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
-}
diff --git a/weed/pb/volume_info.go b/weed/pb/volume_info.go
index c4f733f5c..cae9e018f 100644
--- a/weed/pb/volume_info.go
+++ b/weed/pb/volume_info.go
@@ -15,40 +15,49 @@ import (
)
// MaybeLoadVolumeInfo load the file data as *volume_server_pb.VolumeInfo, the returned volumeInfo will not be nil
-func MaybeLoadVolumeInfo(fileName string) (*volume_server_pb.VolumeInfo, bool, error) {
+func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeInfo, hasRemoteFile bool, hasVolumeInfoFile bool, err error) {
- volumeInfo := &volume_server_pb.VolumeInfo{}
+ volumeInfo = &volume_server_pb.VolumeInfo{}
glog.V(1).Infof("maybeLoadVolumeInfo checks %s", fileName)
if exists, canRead, _, _, _ := util.CheckFile(fileName); !exists || !canRead {
if !exists {
- return volumeInfo, false, nil
+ return
}
+ hasVolumeInfoFile = true
if !canRead {
glog.Warningf("can not read %s", fileName)
- return volumeInfo, false, fmt.Errorf("can not read %s", fileName)
+ err = fmt.Errorf("can not read %s", fileName)
+ return
}
- return volumeInfo, false, nil
+ return
}
+ hasVolumeInfoFile = true
+
glog.V(1).Infof("maybeLoadVolumeInfo reads %s", fileName)
tierData, readErr := ioutil.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
- return volumeInfo, false, fmt.Errorf("fail to read %s : %v", fileName, readErr)
+ err = fmt.Errorf("fail to read %s : %v", fileName, readErr)
+ return
+
}
glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName)
- if err := jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
+ if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
glog.Warningf("unmarshal error: %v", err)
- return volumeInfo, false, fmt.Errorf("unmarshal error: %v", err)
+ err = fmt.Errorf("unmarshal error: %v", err)
+ return
}
if len(volumeInfo.GetFiles()) == 0 {
- return volumeInfo, false, nil
+ return
}
- return volumeInfo, true, nil
+ hasRemoteFile = true
+
+ return
}
func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) error {