aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/erasure_coding/ec_volume.go5
-rw-r--r--weed/storage/store.go11
-rw-r--r--weed/storage/store_ec.go9
-rw-r--r--weed/storage/store_ec_delete.go3
-rw-r--r--weed/storage/volume_backup.go3
5 files changed, 19 insertions, 12 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index a6e880350..f4cde310f 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -3,6 +3,7 @@ package erasure_coding
import (
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"math"
"os"
@@ -30,7 +31,7 @@ type EcVolume struct {
ecxFileSize int64
ecxCreatedAt time.Time
Shards []*EcVolumeShard
- ShardLocations map[ShardId][]string
+ ShardLocations map[ShardId][]pb.ServerAddress
ShardLocationsRefreshTime time.Time
ShardLocationsLock sync.RWMutex
Version needle.Version
@@ -69,7 +70,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
- ev.ShardLocations = make(map[ShardId][]string)
+ ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
return
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index b5f8f9c4b..8381705d6 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"github.com/chrislusf/seaweedfs/weed/util"
"path/filepath"
@@ -31,11 +32,12 @@ type ReadOption struct {
* A VolumeServer contains one Store
*/
type Store struct {
- MasterAddress string
+ MasterAddress pb.ServerAddress
grpcDialOption grpc.DialOption
volumeSizeLimit uint64 // read from the master
Ip string
Port int
+ GrpcPort int
PublicUrl string
Locations []*DiskLocation
dataCenter string // optional informaton, overwriting master setting if exists
@@ -50,13 +52,13 @@ type Store struct {
}
func (s *Store) String() (str string) {
- str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
+ str = fmt.Sprintf("Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
return
}
-func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int,
+func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int,
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
- s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
+ s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpaces[i], idxFolder, diskTypes[i])
@@ -311,6 +313,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
return &master_pb.Heartbeat{
Ip: s.Ip,
Port: uint32(s.Port),
+ GrpcPort: uint32(s.GrpcPort),
PublicUrl: s.PublicUrl,
MaxVolumeCounts: maxVolumeCounts,
MaxFileKey: NeedleIdToUint64(maxFileKey),
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 6ba7237e2..0e33a8e32 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"os"
"sort"
@@ -255,7 +256,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume)
shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
delete(ecVolume.ShardLocations, shardId)
for _, loc := range shardIdLocations.Locations {
- ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
+ ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], pb.NewServerAddressFromLocation(loc))
}
}
ecVolume.ShardLocationsRefreshTime = time.Now()
@@ -266,7 +267,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume)
return
}
-func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
@@ -284,7 +285,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId typ
return
}
-func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
@@ -349,7 +350,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum
// read from remote locations
wg.Add(1)
- go func(shardId erasure_coding.ShardId, locations []string) {
+ go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
defer wg.Done()
data := make([]byte, len(buf))
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index 6c10af3c5..d40165ff5 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -85,7 +86,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh
}
-func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 82ea12a89..7fadd6fef 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"os"
@@ -62,7 +63,7 @@ update needle map when receiving new .dat bytes. But seems not necessary now.)
*/
-func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
+func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption) error {
startFromOffset, _, _ := v.FileStat()
appendAtNs, err := v.findLastAppendAtNs()