aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/store.go')
-rw-r--r--weed/storage/store.go73
1 files changed, 54 insertions, 19 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 61d55447d..5b2110bd8 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -18,18 +18,18 @@ const (
* A VolumeServer contains one Store
*/
type Store struct {
- Ip string
- Port int
- PublicUrl string
- Locations []*DiskLocation
- dataCenter string //optional informaton, overwriting master setting if exists
- rack string //optional information, overwriting master setting if exists
- connected bool
- volumeSizeLimit uint64 //read from the master
- Client master_pb.Seaweed_SendHeartbeatClient
- NeedleMapType NeedleMapType
- NewVolumeIdChan chan needle.VolumeId
- DeletedVolumeIdChan chan needle.VolumeId
+ Ip string
+ Port int
+ PublicUrl string
+ Locations []*DiskLocation
+ dataCenter string //optional informaton, overwriting master setting if exists
+ rack string //optional information, overwriting master setting if exists
+ connected bool
+ volumeSizeLimit uint64 //read from the master
+ Client master_pb.Seaweed_SendHeartbeatClient
+ NeedleMapType NeedleMapType
+ NewVolumesChan chan master_pb.VolumeShortInformationMessage
+ DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
}
func (s *Store) String() (str string) {
@@ -45,8 +45,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
}
- s.NewVolumeIdChan = make(chan needle.VolumeId, 3)
- s.DeletedVolumeIdChan = make(chan needle.VolumeId, 3)
+ s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
+ s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
@@ -67,7 +67,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
if e != nil {
return
}
- // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumeIdChan
+ // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
}
return
}
@@ -100,7 +100,13 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
location.SetVolume(vid, volume)
- s.NewVolumeIdChan <- vid
+ glog.V(0).Info("add volume", vid)
+ s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
+ Id: uint32(vid),
+ Collection: collection,
+ ReplicaPlacement: uint32(replicaPlacement.Byte()),
+ Ttl: ttl.ToUint32(),
+ }
return nil
} else {
return err
@@ -229,7 +235,14 @@ func (s *Store) HasVolume(i needle.VolumeId) bool {
func (s *Store) MountVolume(i needle.VolumeId) error {
for _, location := range s.Locations {
if found := location.LoadVolume(i, s.NeedleMapType); found == true {
- s.NewVolumeIdChan <- needle.VolumeId(i)
+ glog.V(0).Infof("mount volume %d", i)
+ v := s.findVolume(i)
+ s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
+ Id: uint32(v.Id),
+ Collection: v.Collection,
+ ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
+ Ttl: v.Ttl.ToUint32(),
+ }
return nil
}
}
@@ -238,9 +251,20 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
}
func (s *Store) UnmountVolume(i needle.VolumeId) error {
+ v := s.findVolume(i)
+ if v == nil {
+ return nil
+ }
+ message := master_pb.VolumeShortInformationMessage{
+ Id: uint32(v.Id),
+ Collection: v.Collection,
+ ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
+ Ttl: v.Ttl.ToUint32(),
+ }
for _, location := range s.Locations {
if err := location.UnloadVolume(i); err == nil {
- s.DeletedVolumeIdChan <- needle.VolumeId(i)
+ glog.V(0).Infof("UnmountVolume %d", i)
+ s.DeletedVolumesChan <- message
return nil
}
}
@@ -249,9 +273,20 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
}
func (s *Store) DeleteVolume(i needle.VolumeId) error {
+ v := s.findVolume(i)
+ if v == nil {
+ return nil
+ }
+ message := master_pb.VolumeShortInformationMessage{
+ Id: uint32(v.Id),
+ Collection: v.Collection,
+ ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
+ Ttl: v.Ttl.ToUint32(),
+ }
for _, location := range s.Locations {
if error := location.deleteVolumeById(i); error == nil {
- s.DeletedVolumeIdChan <- needle.VolumeId(i)
+ glog.V(0).Infof("DeleteVolume %d", i)
+ s.DeletedVolumesChan <- message
return nil
}
}