diff options
Diffstat (limited to 'weed/storage/store.go')
| -rw-r--r-- | weed/storage/store.go | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go index 61d55447d..5b2110bd8 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -18,18 +18,18 @@ const ( * A VolumeServer contains one Store */ type Store struct { - Ip string - Port int - PublicUrl string - Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists - rack string //optional information, overwriting master setting if exists - connected bool - volumeSizeLimit uint64 //read from the master - Client master_pb.Seaweed_SendHeartbeatClient - NeedleMapType NeedleMapType - NewVolumeIdChan chan needle.VolumeId - DeletedVolumeIdChan chan needle.VolumeId + Ip string + Port int + PublicUrl string + Locations []*DiskLocation + dataCenter string //optional informaton, overwriting master setting if exists + rack string //optional information, overwriting master setting if exists + connected bool + volumeSizeLimit uint64 //read from the master + Client master_pb.Seaweed_SendHeartbeatClient + NeedleMapType NeedleMapType + NewVolumesChan chan master_pb.VolumeShortInformationMessage + DeletedVolumesChan chan master_pb.VolumeShortInformationMessage } func (s *Store) String() (str string) { @@ -45,8 +45,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } - s.NewVolumeIdChan = make(chan needle.VolumeId, 3) - s.DeletedVolumeIdChan = make(chan needle.VolumeId, 3) + s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) + s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) return } func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { @@ -67,7 +67,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { if e != nil { return } - // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumeIdChan + // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan } return } @@ -100,7 +100,13 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil { location.SetVolume(vid, volume) - s.NewVolumeIdChan <- vid + glog.V(0).Info("add volume", vid) + s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ + Id: uint32(vid), + Collection: collection, + ReplicaPlacement: uint32(replicaPlacement.Byte()), + Ttl: ttl.ToUint32(), + } return nil } else { return err @@ -229,7 +235,14 @@ func (s *Store) HasVolume(i needle.VolumeId) bool { func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { if found := location.LoadVolume(i, s.NeedleMapType); found == true { - s.NewVolumeIdChan <- needle.VolumeId(i) + glog.V(0).Infof("mount volume %d", i) + v := s.findVolume(i) + s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ + Id: uint32(v.Id), + Collection: v.Collection, + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Ttl: v.Ttl.ToUint32(), + } return nil } } @@ -238,9 +251,20 @@ func (s *Store) MountVolume(i needle.VolumeId) error { } func (s *Store) UnmountVolume(i needle.VolumeId) error { + v := s.findVolume(i) + if v == nil { + return nil + } + message := master_pb.VolumeShortInformationMessage{ + Id: uint32(v.Id), + Collection: v.Collection, + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Ttl: v.Ttl.ToUint32(), + } for _, location := range s.Locations { if err := location.UnloadVolume(i); err == nil { - s.DeletedVolumeIdChan <- needle.VolumeId(i) + glog.V(0).Infof("UnmountVolume %d", i) + s.DeletedVolumesChan <- message return nil } } @@ -249,9 +273,20 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { } func (s *Store) DeleteVolume(i needle.VolumeId) error { + v := s.findVolume(i) + if v == nil { + return nil + } + message := master_pb.VolumeShortInformationMessage{ + Id: uint32(v.Id), + Collection: v.Collection, + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Ttl: v.Ttl.ToUint32(), + } for _, location := range s.Locations { if error := location.deleteVolumeById(i); error == nil { - s.DeletedVolumeIdChan <- needle.VolumeId(i) + glog.V(0).Infof("DeleteVolume %d", i) + s.DeletedVolumesChan <- message return nil } } |
