diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 5 | ||||
| -rw-r--r-- | weed/storage/store.go | 11 | ||||
| -rw-r--r-- | weed/storage/store_ec.go | 9 | ||||
| -rw-r--r-- | weed/storage/store_ec_delete.go | 3 | ||||
| -rw-r--r-- | weed/storage/volume_backup.go | 3 |
5 files changed, 19 insertions, 12 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index a6e880350..f4cde310f 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -3,6 +3,7 @@ package erasure_coding import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "math" "os" @@ -30,7 +31,7 @@ type EcVolume struct { ecxFileSize int64 ecxCreatedAt time.Time Shards []*EcVolumeShard - ShardLocations map[ShardId][]string + ShardLocations map[ShardId][]pb.ServerAddress ShardLocationsRefreshTime time.Time ShardLocationsLock sync.RWMutex Version needle.Version @@ -69,7 +70,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) } - ev.ShardLocations = make(map[ShardId][]string) + ev.ShardLocations = make(map[ShardId][]pb.ServerAddress) return } diff --git a/weed/storage/store.go b/weed/storage/store.go index b5f8f9c4b..8381705d6 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "github.com/chrislusf/seaweedfs/weed/util" "path/filepath" @@ -31,11 +32,12 @@ type ReadOption struct { * A VolumeServer contains one Store */ type Store struct { - MasterAddress string + MasterAddress pb.ServerAddress grpcDialOption grpc.DialOption volumeSizeLimit uint64 // read from the master Ip string Port int + GrpcPort int PublicUrl string Locations []*DiskLocation dataCenter string // optional informaton, overwriting master setting if exists @@ -50,13 +52,13 @@ type Store struct { } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit()) + str = fmt.Sprintf("Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit()) return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, +func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpaces[i], idxFolder, diskTypes[i]) @@ -311,6 +313,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { return &master_pb.Heartbeat{ Ip: s.Ip, Port: uint32(s.Port), + GrpcPort: uint32(s.GrpcPort), PublicUrl: s.PublicUrl, MaxVolumeCounts: maxVolumeCounts, MaxFileKey: NeedleIdToUint64(maxFileKey), diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 6ba7237e2..0e33a8e32 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "os" "sort" @@ -255,7 +256,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) shardId := erasure_coding.ShardId(shardIdLocations.ShardId) delete(ecVolume.ShardLocations, shardId) for _, loc := range shardIdLocations.Locations { - ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url) + ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], pb.NewServerAddressFromLocation(loc)) } } ecVolume.ShardLocationsRefreshTime = time.Now() @@ -266,7 +267,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) return } -func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) @@ -284,7 +285,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId typ return } -func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { @@ -349,7 +350,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum // read from remote locations wg.Add(1) - go func(shardId erasure_coding.ShardId, locations []string) { + go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) { defer wg.Done() data := make([]byte, len(buf)) nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index 6c10af3c5..d40165ff5 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -85,7 +86,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh } -func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 82ea12a89..7fadd6fef 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "os" @@ -62,7 +63,7 @@ update needle map when receiving new .dat bytes. But seems not necessary now.) */ -func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error { +func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption) error { startFromOffset, _, _ := v.FileStat() appendAtNs, err := v.findLastAppendAtNs() |
