aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/storage/cdb_map.go4
-rw-r--r--go/storage/needle_map.go5
-rw-r--r--go/storage/store.go14
-rw-r--r--go/storage/volume.go15
-rw-r--r--go/topology/collection.go11
-rw-r--r--go/topology/data_node.go24
-rw-r--r--go/topology/topology.go11
-rw-r--r--go/topology/topology_event_handling.go4
-rw-r--r--go/topology/volume_layout.go18
-rw-r--r--go/weed/weed_server/master_server.go1
-rw-r--r--go/weed/weed_server/master_server_handlers.go17
-rw-r--r--go/weed/weed_server/volume_server.go1
-rw-r--r--go/weed/weed_server/volume_server_handlers.go13
13 files changed, 130 insertions, 8 deletions
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
index 0d790cc0f..8be302111 100644
--- a/go/storage/cdb_map.go
+++ b/go/storage/cdb_map.go
@@ -64,6 +64,10 @@ func (m *cdbMap) Close() {
}
}
+func (m *cdbMap) Destroy() error {
+ return errors.New("Can not delete readonly volumes")
+}
+
func (m cdbMap) ContentSize() uint64 {
return m.FileByteCounter
}
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 29b71ae52..ef7d3d6fd 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -14,6 +14,7 @@ type NeedleMapper interface {
Get(key uint64) (element *NeedleValue, ok bool)
Delete(key uint64) error
Close()
+ Destroy() error
ContentSize() uint64
DeletedSize() uint64
FileCount() int
@@ -155,6 +156,10 @@ func (nm *NeedleMap) Delete(key uint64) error {
func (nm *NeedleMap) Close() {
_ = nm.indexFile.Close()
}
+func (nm *NeedleMap) Destroy() error {
+ nm.Close()
+ return os.Remove(nm.indexFile.Name())
+}
func (nm NeedleMap) ContentSize() uint64 {
return nm.FileByteCounter
}
diff --git a/go/storage/store.go b/go/storage/store.go
index 2df0e6cb7..e1babd1e5 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -111,6 +111,20 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
}
return e
}
+func (s *Store) DeleteCollection(collection string) (e error) {
+ for _, location := range s.locations {
+ for k, v := range location.volumes {
+ if v.Collection == collection {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(location.volumes, k)
+ }
+ }
+ }
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.locations {
if v, found := location.volumes[vid]; found {
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 69817a6d4..0301d7968 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -197,6 +197,21 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
}
return false
}
+
+func (v *Volume) Destroy() (err error) {
+ if v.readOnly {
+ err = fmt.Errorf("%s is read-only", v.dataFile)
+ return
+ }
+ v.Close()
+ err = os.Remove(v.dataFile.Name())
+ if err != nil {
+ return
+ }
+ err = v.nm.Destroy()
+ return
+}
+
func (v *Volume) write(n *Needle) (size uint32, err error) {
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile)
diff --git a/go/topology/collection.go b/go/topology/collection.go
index 8042369a9..b21122d22 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -36,3 +36,14 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
return nil
}
+
+func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
+ for _, vl := range c.replicaType2VolumeLayout {
+ if vl != nil {
+ if list := vl.ListVolumeServers(); list != nil {
+ nodes = append(nodes, list...)
+ }
+ }
+ }
+ return
+}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 3a6edb447..a83647939 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -1,8 +1,8 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
- _ "fmt"
"strconv"
)
@@ -28,12 +28,32 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
- dn.UpAdjustActiveVolumeCountDelta(1)
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(1)
+ }
dn.UpAdjustMaxVolumeId(v.Id)
} else {
dn.volumes[v.Id] = v
}
}
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
+ actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+ for _, v := range actualVolumes {
+ actualVolumeMap[v.Id] = v
+ }
+ for vid, _ := range dn.volumes {
+ glog.V(2).Infoln("Checking volume id:", vid)
+ if _, ok := actualVolumeMap[vid]; !ok {
+ glog.V(0).Infoln("Deleting volume id:", vid)
+ delete(dn.volumes, vid)
+ dn.UpAdjustVolumeCountDelta(-1)
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ } //TODO: adjust max volume id, if need to reclaim volume ids
+ for _, v := range actualVolumes {
+ dn.AddOrUpdateVolume(v)
+ }
+}
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 1426f7a12..d72879035 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -99,6 +99,15 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
}
+func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
+ collection, ok = t.collectionMap[collectionName]
+ return
+}
+
+func (t *Topology) DeleteCollection(collectionName string) {
+ delete(t.collectionMap, collectionName)
+}
+
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
}
@@ -112,8 +121,8 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo,
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
+ dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
- dn.AddOrUpdateVolume(v)
t.RegisterVolumeLayout(&v, dn)
}
}
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 5740c9a03..710e7b2ae 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -46,7 +46,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
return false
}
for _, dn := range vl.vid2location[volumeInfo.Id].list {
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ if !volumeInfo.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
}
return true
}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 40628b4a0..1a35faa5c 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -8,6 +8,7 @@ import (
"sync"
)
+// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
vid2location map[storage.VolumeId]*VolumeLocationList
@@ -56,6 +57,13 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
return nil
}
+func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
+ for _, location := range vl.vid2location {
+ nodes = append(nodes, location.list...)
+ }
+ return
+}
+
func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
@@ -134,10 +142,12 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- if vl.vid2location[vid].Remove(dn) {
- if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() {
- glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount())
- return vl.removeFromWritable(vid)
+ if location, ok := vl.vid2location[vid]; ok {
+ if location.Remove(dn) {
+ if location.Length() < vl.rp.GetCopyCount() {
+ glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
+ return vl.removeFromWritable(vid)
+ }
}
}
return false
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index c1870e79e..d17e57531 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -60,6 +60,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index 0c710429c..64786dc10 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -2,6 +2,7 @@ package weed_server
import (
"code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
"net/http"
@@ -78,6 +79,22 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
}
+func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
+ collection, ok := ms.topo.GetCollection(r.FormValue("collection"))
+ if !ok {
+ writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"})
+ return
+ }
+ for _, server := range collection.ListVolumeServers() {
+ _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
+ if err != nil {
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ return
+ }
+ }
+ ms.topo.DeleteCollection(r.FormValue("collection"))
+}
+
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
init := r.FormValue("init") == "true"
ip := r.FormValue("ip")
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index a2939f848..b4fc4f4ea 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -39,6 +39,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler))
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
+ r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
r.HandleFunc("/", vs.storeHandler)
go func() {
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index 2f4673763..a7ea8ad87 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -29,6 +29,19 @@ func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Reque
}
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
}
+func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) {
+ if "benchmark" != r.FormValue("collection") {
+ glog.V(0).Infoln("deleting collection =", r.FormValue("collection"), "!!!")
+ return
+ }
+ err := vs.store.DeleteCollection(r.FormValue("collection"))
+ if err == nil {
+ writeJsonQuiet(w, r, map[string]string{"error": ""})
+ } else {
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ }
+ glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
+}
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
if err == nil {