diff options
Diffstat (limited to 'weed/pb')
| -rw-r--r-- | weed/pb/filer_pb/filer_client.go | 14 | ||||
| -rw-r--r-- | weed/pb/grpc_client_server.go | 65 | ||||
| -rw-r--r-- | weed/pb/volume_info.go | 29 |
3 files changed, 58 insertions, 50 deletions
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 079fbd671..e4d8bee34 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "os" "strings" "time" @@ -101,12 +102,16 @@ func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, } func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - + // Redundancy limit to make it correctly judge whether it is the last file. + redLimit := limit + if limit != math.MaxInt32 && limit != 0{ + redLimit = limit + 1 + } request := &ListEntriesRequest{ Directory: string(fullDirPath), Prefix: prefix, StartFromFileName: startFrom, - Limit: limit, + Limit: redLimit, InclusiveStartFrom: inclusive, } @@ -119,6 +124,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix } var prevEntry *Entry + count := 0 for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -139,6 +145,10 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix } } prevEntry = resp.Entry + count++ + if count > int(limit) && limit != 0 { + prevEntry = nil + } } return nil diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 910114313..9efcd9bdc 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,6 +3,7 @@ package pb import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "net/http" "strconv" "strings" @@ -108,51 +109,55 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts } func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { - colonIndex := strings.LastIndex(server, ":") - if colonIndex < 0 { - return "", fmt.Errorf("server should have hostname:port format: %v", server) - } + return ParseServerAddress(server, 10000) +} - port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64) +func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { + + host, port, parseErr := hostAndPort(server) if parseErr != nil { return "", fmt.Errorf("server port parse error: %v", parseErr) } - grpcPort := int(port) + 10000 + newPort := int(port) + deltaPort - return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil + return fmt.Sprintf("%s:%d", host, newPort), nil } -func ServerToGrpcAddress(server string) (serverGrpcAddress string) { - hostnameAndPort := strings.Split(server, ":") - if len(hostnameAndPort) != 2 { - return fmt.Sprintf("unexpected server address: %s", server) +func hostAndPort(address string) (host string, port uint64, err error) { + colonIndex := strings.LastIndex(address, ":") + if colonIndex < 0 { + return "", 0, fmt.Errorf("server should have hostname:port format: %v", address) + } + port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64) + if err != nil { + return "", 0, fmt.Errorf("server port parse error: %v", err) } - port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + return address[:colonIndex], port, err +} + +func ServerToGrpcAddress(server string) (serverGrpcAddress string) { + + host, port, parseErr := hostAndPort(server) if parseErr != nil { - return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) + glog.Fatalf("server address %s parse error: %v", server, parseErr) } grpcPort := int(port) + 10000 - return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) + return fmt.Sprintf("%s:%d", host, grpcPort) } func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { - hostnameAndPort := strings.Split(grpcAddress, ":") - if len(hostnameAndPort) != 2 { - return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress) - } - - grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + host, grpcPort, parseErr := hostAndPort(grpcAddress) if parseErr != nil { - return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) + glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr) } port := int(grpcPort) - 10000 - return fmt.Sprintf("%s:%d", hostnameAndPort[0], port) + return fmt.Sprintf("%s:%d", host, port) } func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { @@ -197,19 +202,3 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption }, filerGrpcAddress, grpcDialOption) } - -func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { - hostnameAndPort := strings.Split(filer, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) - } - - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("filer port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil -} diff --git a/weed/pb/volume_info.go b/weed/pb/volume_info.go index c4f733f5c..cae9e018f 100644 --- a/weed/pb/volume_info.go +++ b/weed/pb/volume_info.go @@ -15,40 +15,49 @@ import ( ) // MaybeLoadVolumeInfo load the file data as *volume_server_pb.VolumeInfo, the returned volumeInfo will not be nil -func MaybeLoadVolumeInfo(fileName string) (*volume_server_pb.VolumeInfo, bool, error) { +func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeInfo, hasRemoteFile bool, hasVolumeInfoFile bool, err error) { - volumeInfo := &volume_server_pb.VolumeInfo{} + volumeInfo = &volume_server_pb.VolumeInfo{} glog.V(1).Infof("maybeLoadVolumeInfo checks %s", fileName) if exists, canRead, _, _, _ := util.CheckFile(fileName); !exists || !canRead { if !exists { - return volumeInfo, false, nil + return } + hasVolumeInfoFile = true if !canRead { glog.Warningf("can not read %s", fileName) - return volumeInfo, false, fmt.Errorf("can not read %s", fileName) + err = fmt.Errorf("can not read %s", fileName) + return } - return volumeInfo, false, nil + return } + hasVolumeInfoFile = true + glog.V(1).Infof("maybeLoadVolumeInfo reads %s", fileName) tierData, readErr := ioutil.ReadFile(fileName) if readErr != nil { glog.Warningf("fail to read %s : %v", fileName, readErr) - return volumeInfo, false, fmt.Errorf("fail to read %s : %v", fileName, readErr) + err = fmt.Errorf("fail to read %s : %v", fileName, readErr) + return + } glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName) - if err := jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil { + if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil { glog.Warningf("unmarshal error: %v", err) - return volumeInfo, false, fmt.Errorf("unmarshal error: %v", err) + err = fmt.Errorf("unmarshal error: %v", err) + return } if len(volumeInfo.GetFiles()) == 0 { - return volumeInfo, false, nil + return } - return volumeInfo, true, nil + hasRemoteFile = true + + return } func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) error { |
