aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-11-12 02:21:22 -0800
committerChris Lu <chris.lu@gmail.com>2013-11-12 02:21:22 -0800
commit3b687111399fd08468e4a6232bcbe6068df961bf (patch)
treecde07f3ba6c9ae411d55b25c331bca6827253c30
parent8f0e2f31afad1fcb2f06ef3ae55866313b7b4b02 (diff)
downloadseaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.tar.xz
seaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.zip
support for collections!
-rw-r--r--go/operation/allocate_volume.go3
-rw-r--r--go/replication/volume_growth.go34
-rw-r--r--go/storage/store.go29
-rw-r--r--go/storage/volume.go32
-rw-r--r--go/storage/volume_info.go1
-rw-r--r--go/topology/collection.go38
-rw-r--r--go/topology/topo_test.go7
-rw-r--r--go/topology/topology.go39
-rw-r--r--go/topology/topology_compact.go14
-rw-r--r--go/topology/topology_event_handling.go6
-rw-r--r--go/topology/topology_map.go10
-rw-r--r--go/weed/compact.go7
-rw-r--r--go/weed/export.go3
-rw-r--r--go/weed/fix.go10
-rw-r--r--go/weed/master.go16
-rw-r--r--go/weed/volume.go6
16 files changed, 163 insertions, 92 deletions
diff --git a/go/operation/allocate_volume.go b/go/operation/allocate_volume.go
index ea34901ef..dee114f21 100644
--- a/go/operation/allocate_volume.go
+++ b/go/operation/allocate_volume.go
@@ -13,9 +13,10 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
+func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error {
values := make(url.Values)
values.Add("volume", vid.String())
+ values.Add("collection", collection)
values.Add("replicationType", repType.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
index 2adf72dee..6e5bf1f5c 100644
--- a/go/replication/volume_growth.go
+++ b/go/replication/volume_growth.go
@@ -32,27 +32,27 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
}
-func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) {
+func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) {
factor := 1
switch repType {
case storage.Copy000:
factor = 1
- count, err = vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo)
case storage.Copy001:
factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy010:
factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy100:
factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy110:
factor = 3
- count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
case storage.Copy200:
factor = 3
- count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
+ count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
default:
err = errors.New("Unknown Replication Type!")
}
@@ -61,7 +61,7 @@ func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dat
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
@@ -70,7 +70,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
case storage.Copy000:
for i := 0; i < count; i++ {
if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
- if err = vg.grow(topo, *vid, repType, server); err == nil {
+ if err = vg.grow(topo, *vid, collection, repType, server); err == nil {
counter++
} else {
return counter, err
@@ -89,7 +89,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++
}
}
@@ -107,7 +107,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
newNodeList := topology.NewNodeList(dc.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++
}
}
@@ -129,7 +129,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
if len(servers) == 2 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
@@ -168,7 +168,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
if len(servers) == 3 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
@@ -189,7 +189,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
if len(servers) == 3 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
@@ -198,10 +198,10 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
return
}
-func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
+func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers {
- if err := operation.AllocateVolume(server, vid, repType); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
+ if err := operation.AllocateVolume(server, vid, collection, repType); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/storage/store.go b/go/storage/store.go
index e5dc92bf6..cc9fef7d0 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -39,7 +39,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, replicationType string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error {
rt, e := NewReplicationTypeFromString(replicationType)
if e != nil {
return e
@@ -51,7 +51,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), rt)
+ e = s.addVolume(VolumeId(id), collection, rt)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -63,7 +63,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
e = err
}
}
@@ -90,13 +90,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
- if volume, err := NewVolume(location.directory, vid, replicationType); err == nil {
+ glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType)
+ if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -158,12 +158,17 @@ func (l *DiskLocation) loadExistingVolumes() {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
+ collection := ""
base := name[:len(name)-len(".dat")]
+ i := strings.Index(base, "_")
+ if i > 0 {
+ collection, base = base[0:i], base[i+1:]
+ }
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
+ if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
}
}
}
@@ -177,7 +182,9 @@ func (s *Store) Status() []*VolumeInfo {
for _, location := range s.locations {
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
- RepType: v.ReplicaType, Version: v.Version(),
+ Collection: v.Collection,
+ RepType: v.ReplicaType,
+ Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
@@ -208,7 +215,9 @@ func (s *Store) Join() error {
maxVolumeCount = maxVolumeCount + location.maxVolumeCount
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
- RepType: v.ReplicaType, Version: v.Version(),
+ Collection: v.Collection,
+ RepType: v.ReplicaType,
+ Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
diff --git a/go/storage/volume.go b/go/storage/volume.go
index bdc07fc58..db3a66c1b 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -29,32 +29,38 @@ func (s *SuperBlock) Bytes() []byte {
}
type Volume struct {
- Id VolumeId
- dir string
- dataFile *os.File
- nm NeedleMapper
- readOnly bool
+ Id VolumeId
+ dir string
+ Collection string
+ dataFile *os.File
+ nm NeedleMapper
+ readOnly bool
SuperBlock
accessLock sync.Mutex
}
-func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
- v = &Volume{dir: dirname, Id: id}
+func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: replicationType}
e = v.load(true)
return
}
-func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) {
- v = &Volume{dir: dirname, Id: id}
+func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
e = v.load(false)
return
}
func (v *Volume) load(alsoLoadIndex bool) error {
var e error
- fileName := path.Join(v.dir, v.Id.String())
+ var fileName string
+ if v.Collection == "" {
+ fileName = path.Join(v.dir, v.Id.String())
+ } else {
+ fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
+ }
if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
} else if !exists || canWrite {
@@ -309,11 +315,11 @@ func (v *Volume) freeze() error {
return nil
}
-func ScanVolumeFile(dirname string, id VolumeId,
+func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
return
}
if err = visitSuperBlock(v.SuperBlock); err != nil {
@@ -365,7 +371,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
- err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
+ err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, func(n *Needle, offset int64) error {
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 5a83b6e36..c8eb7612e 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -6,6 +6,7 @@ type VolumeInfo struct {
Id VolumeId
Size uint64
RepType ReplicationType
+ Collection string
Version Version
FileCount int
DeleteCount int
diff --git a/go/topology/collection.go b/go/topology/collection.go
new file mode 100644
index 000000000..0a7971424
--- /dev/null
+++ b/go/topology/collection.go
@@ -0,0 +1,38 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+)
+
+type Collection struct {
+ Name string
+ volumeSizeLimit uint64
+ replicaType2VolumeLayout []*VolumeLayout
+}
+
+func NewCollection(name string, volumeSizeLimit uint64) *Collection {
+ c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
+ c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ return c
+}
+
+func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if c.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ glog.V(0).Infoln("collection", c.Name, "adding replication type", repType)
+ c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit)
+ }
+ return c.replicaType2VolumeLayout[replicationTypeIndex]
+}
+
+func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+ for _, vl := range c.replicaType2VolumeLayout {
+ if vl != nil {
+ if list := vl.Lookup(vid); list != nil {
+ return list
+ }
+ }
+ }
+ return nil
+}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index 36f4963db..c0edca7c1 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -99,9 +99,10 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
- Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Collection: "testingCollection",
+ Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
diff --git a/go/topology/topology.go b/go/topology/topology.go
index b21601210..5b3d29e0b 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -12,8 +12,7 @@ import (
type Topology struct {
NodeImpl
- //transient vid~servers mapping for each replication type
- replicaType2VolumeLayout []*VolumeLayout
+ collectionMap map[string]*Collection
pulse int64
@@ -34,7 +33,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
- t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ t.collectionMap = make(map[string]*Collection)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
@@ -60,13 +59,18 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil
}
-func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- if list := vl.Lookup(vid); list != nil {
+func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+ //maybe an issue if lots of collections?
+ if collection == "" {
+ for _, c := range t.collectionMap {
+ if list := c.Lookup(vid); list != nil {
return list
}
}
+ } else {
+ if c, ok := t.collectionMap[collection]; ok {
+ return c.Lookup(vid)
+ }
}
return nil
}
@@ -86,12 +90,8 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next()
}
-func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter)
+func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -99,17 +99,16 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, data
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- glog.V(0).Infoln("adding replication type", repType)
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout {
+ _, ok := t.collectionMap[collectionName]
+ if !ok {
+ t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.replicaType2VolumeLayout[replicationTypeIndex]
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType)
}
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
+ t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go
index 4ba77a4a5..a1d6d2564 100644
--- a/go/topology/topology_compact.go
+++ b/go/topology/topology_compact.go
@@ -79,12 +79,14 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess
}
func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
+ for _, c := range t.collectionMap {
+ for _, vl := range c.replicaType2VolumeLayout {
+ if vl != nil {
+ for vid, locationlist := range vl.vid2location {
+ if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(vl, vid, locationlist) {
+ batchVacuumVolumeCommit(vl, vid, locationlist)
+ }
}
}
}
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index f3b09c649..7f81d8184 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -37,7 +37,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.RepType)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -59,7 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.RepType)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index b416ee943..f66d4c251 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -13,9 +13,13 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
- for _, layout := range t.replicaType2VolumeLayout {
- if layout != nil {
- layouts = append(layouts, layout.ToMap())
+ for _, c := range t.collectionMap {
+ for _, layout := range c.replicaType2VolumeLayout {
+ if layout != nil {
+ tmp := layout.ToMap()
+ tmp["collection"] = c.Name
+ layouts = append(layouts, tmp)
+ }
}
}
m["layouts"] = layouts
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 30ae6abd2..2600b3362 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -21,8 +21,9 @@ var cmdCompact = &Command{
}
var (
- compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files")
- compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
+ compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files")
+ compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name")
+ compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
)
func runCompact(cmd *Command, args []string) bool {
@@ -32,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, vid, storage.CopyNil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/export.go b/go/weed/export.go
index 0c5a6c227..e6644adc7 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -35,6 +35,7 @@ var cmdExport = &Command{
var (
exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
+ exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name")
exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
@@ -95,7 +96,7 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version
- err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
+ err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
version = superBlock.Version
return nil
}, func(n *storage.Needle, offset int64) error {
diff --git a/go/weed/fix.go b/go/weed/fix.go
index c97fd60d3..159e2dbde 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -22,8 +22,9 @@ var cmdFix = &Command{
}
var (
- fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
- fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+ fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
+ fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
+ fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
)
func runFix(cmd *Command, args []string) bool {
@@ -33,6 +34,9 @@ func runFix(cmd *Command, args []string) bool {
}
fileName := strconv.Itoa(*fixVolumeId)
+ if *fixVolumeCollection != "" {
+ fileName = *fixVolumeCollection + "_" + fileName
+ }
indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
@@ -43,7 +47,7 @@ func runFix(cmd *Command, args []string) bool {
defer nm.Close()
vid := storage.VolumeId(*fixVolumeId)
- err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
+ err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error {
return nil
}, func(n *storage.Needle, offset int64) error {
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
diff --git a/go/weed/master.go b/go/weed/master.go
index 950aaca6d..3beecaaf9 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -56,13 +56,14 @@ var vgLock sync.Mutex
func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId")
+ collection := r.FormValue("collection") //optional, but can be faster if too many collections
commaSep := strings.Index(vid, ",")
if commaSep > 0 {
vid = vid[0:commaSep]
}
volumeId, err := storage.NewVolumeId(vid)
if err == nil {
- machines := topo.Lookup(volumeId)
+ machines := topo.Lookup(collection, volumeId)
if machines != nil {
ret := []map[string]string{}
for _, dn := range machines {
@@ -88,6 +89,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" {
repType = *defaultRepType
}
+ collection := r.FormValue("collection")
dataCenter := r.FormValue("dataCenter")
rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil {
@@ -96,7 +98,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
return
}
- if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
+ if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
if topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
@@ -104,15 +106,15 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
} else {
vgLock.Lock()
defer vgLock.Unlock()
- if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
- if _, err = vg.AutomaticGrowByType(rt, dataCenter, topo); err != nil {
+ if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
+ if _, err = vg.AutomaticGrowByType(collection, rt, dataCenter, topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
}
- fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter)
+ fid, count, dn, err := topo.PickForWrite(collection, rt, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -168,7 +170,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else {
- count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo)
+ count, err = vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -197,7 +199,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) {
debug("parsing error:", err, r.URL.Path)
return
}
- machines := topo.Lookup(volumeId)
+ machines := topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
diff --git a/go/weed/volume.go b/go/weed/volume.go
index cf58af799..87d42e227 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -56,13 +56,13 @@ func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, m)
}
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
+ err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType"))
if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
}
- debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
+ debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
@@ -112,6 +112,8 @@ func storeHandler(w http.ResponseWriter, r *http.Request) {
GetOrHeadHandler(w, r, false)
case "DELETE":
secure(volumeWhiteList, DeleteHandler)(w, r)
+ case "PUT":
+ secure(volumeWhiteList, PostHandler)(w, r)
case "POST":
secure(volumeWhiteList, PostHandler)(w, r)
}