aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-26 22:54:22 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-26 22:54:22 -0800
commitdb8e27be6ec7daa1a188f90f61e385c04cb6b008 (patch)
tree33e53b6ec51157709bc6121adeb8b19fe668c79b
parentbd278337db4e3c1937f2d7cd1623ee9627c77619 (diff)
downloadseaweedfs-db8e27be6ec7daa1a188f90f61e385c04cb6b008.tar.xz
seaweedfs-db8e27be6ec7daa1a188f90f61e385c04cb6b008.zip
add lots of error checking by GThomas
-rw-r--r--go/directory/file_id.go2
-rw-r--r--go/operation/allocate_volume.go6
-rw-r--r--go/operation/lookup_volume_id.go4
-rw-r--r--go/operation/upload_content.go14
-rw-r--r--go/replication/volume_growth.go6
-rw-r--r--go/replication/volume_growth_test.go9
-rw-r--r--go/sequence/sequence.go15
-rw-r--r--go/storage/compact_map_perf_test.go2
-rw-r--r--go/storage/compress.go2
-rw-r--r--go/storage/needle_map.go9
-rw-r--r--go/storage/needle_read_write.go2
-rw-r--r--go/storage/store.go6
-rw-r--r--go/storage/volume.go27
-rw-r--r--go/topology/data_node.go2
-rw-r--r--go/topology/node.go2
-rw-r--r--go/topology/node_list.go2
-rw-r--r--go/topology/node_list_test.go14
-rw-r--r--go/topology/topo_test.go124
-rw-r--r--go/topology/topology.go12
-rw-r--r--go/topology/topology_compact.go4
-rw-r--r--go/topology/topology_event_handling.go2
-rw-r--r--go/topology/volume_layout.go2
-rw-r--r--go/weed/export.go4
-rw-r--r--go/weed/fix.go4
-rw-r--r--go/weed/master.go52
-rw-r--r--go/weed/shell.go14
-rw-r--r--go/weed/upload.go4
-rw-r--r--go/weed/volume.go59
-rw-r--r--go/weed/weed.go33
29 files changed, 268 insertions, 170 deletions
diff --git a/go/directory/file_id.go b/go/directory/file_id.go
index f5f6d46d6..807a10f7a 100644
--- a/go/directory/file_id.go
+++ b/go/directory/file_id.go
@@ -1,9 +1,9 @@
package directory
import (
- "encoding/hex"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
+ "encoding/hex"
"strings"
)
diff --git a/go/operation/allocate_volume.go b/go/operation/allocate_volume.go
index 19166eaed..ea34901ef 100644
--- a/go/operation/allocate_volume.go
+++ b/go/operation/allocate_volume.go
@@ -1,12 +1,12 @@
package operation
import (
- "encoding/json"
- "errors"
- "net/url"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
+ "encoding/json"
+ "errors"
+ "net/url"
)
type AllocateVolumeResult struct {
diff --git a/go/operation/lookup_volume_id.go b/go/operation/lookup_volume_id.go
index 8512ac918..0d8f247be 100644
--- a/go/operation/lookup_volume_id.go
+++ b/go/operation/lookup_volume_id.go
@@ -1,12 +1,12 @@
package operation
import (
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
_ "fmt"
"net/url"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
)
type Location struct {
diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go
index 0bdb697da..cae657b2c 100644
--- a/go/operation/upload_content.go
+++ b/go/operation/upload_content.go
@@ -21,9 +21,19 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
file_writer, err := body_writer.CreateFormFile("file", filename)
- io.Copy(file_writer, reader)
+ if err != nil {
+ log.Println("error creating form file", err)
+ return nil, err
+ }
+ if _, err = io.Copy(file_writer, reader); err != nil {
+ log.Println("error copying data", err)
+ return nil, err
+ }
content_type := body_writer.FormDataContentType()
- body_writer.Close()
+ if err = body_writer.Close(); err != nil {
+ log.Println("error closing body", err)
+ return nil, err
+ }
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
log.Println("failing to upload to", uploadUrl)
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
index 747e07642..1f266f73f 100644
--- a/go/replication/volume_growth.go
+++ b/go/replication/volume_growth.go
@@ -1,12 +1,12 @@
package replication
import (
- "errors"
- "fmt"
- "math/rand"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
+ "errors"
+ "fmt"
+ "math/rand"
"sync"
)
diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go
index e35dbc707..3fbeebc9e 100644
--- a/go/replication/volume_growth_test.go
+++ b/go/replication/volume_growth_test.go
@@ -1,11 +1,11 @@
package replication
import (
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
"encoding/json"
"fmt"
"math/rand"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
"testing"
"time"
)
@@ -96,7 +96,10 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
+ vi := storage.VolumeInfo{
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go
index c85289468..0237f1f80 100644
--- a/go/sequence/sequence.go
+++ b/go/sequence/sequence.go
@@ -36,10 +36,15 @@ func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
} else {
decoder := gob.NewDecoder(seqFile)
defer seqFile.Close()
- decoder.Decode(&m.FileIdSequence)
- log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
+ if se = decoder.Decode(&m.FileIdSequence); se != nil {
+ log.Printf("error decoding FileIdSequence: %s", se)
+ m.FileIdSequence = FileIdSaveInterval
+ log.Println("Setting file id sequence", m.FileIdSequence)
+ } else {
+ log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
+ m.FileIdSequence += FileIdSaveInterval
+ }
//in case the server stops between intervals
- m.FileIdSequence += FileIdSaveInterval
}
return
}
@@ -67,5 +72,7 @@ func (m *SequencerImpl) saveSequence() {
}
defer seqFile.Close()
encoder := gob.NewEncoder(seqFile)
- encoder.Encode(m.FileIdSequence)
+ if e = encoder.Encode(m.FileIdSequence); e != nil {
+ log.Fatalf("Sequence File Save [ERROR] %s\n", e)
+ }
}
diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go
index cfc53ef65..b885ebd93 100644
--- a/go/storage/compact_map_perf_test.go
+++ b/go/storage/compact_map_perf_test.go
@@ -1,9 +1,9 @@
package storage
import (
+ "code.google.com/p/weed-fs/go/util"
"log"
"os"
- "code.google.com/p/weed-fs/go/util"
"testing"
)
diff --git a/go/storage/compress.go b/go/storage/compress.go
index 256789c9c..e8816422b 100644
--- a/go/storage/compress.go
+++ b/go/storage/compress.go
@@ -10,7 +10,7 @@ import (
/*
* Default more not to gzip since gzip can be done on client side.
-*/
+ */
func IsGzippable(ext, mtype string) bool {
if strings.HasPrefix(mtype, "text/") {
return true
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 4465fab22..b2e232009 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -94,14 +94,17 @@ func (nm *NeedleMap) Delete(key uint64) error {
util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0)
if _, err = nm.indexFile.Write(nm.bytes); err != nil {
- nm.indexFile.Truncate(offset)
- return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile, err)
+ plus := ""
+ if e := nm.indexFile.Truncate(offset); e != nil {
+ plus = "\ncouldn't truncate index file: " + e.Error()
+ }
+ return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus)
}
nm.deletionCounter++
return nil
}
func (nm *NeedleMap) Close() {
- nm.indexFile.Close()
+ _ = nm.indexFile.Close()
}
func (nm *NeedleMap) ContentSize() uint64 {
return nm.fileByteCounter
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 881db50a6..c5f27ea21 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -1,11 +1,11 @@
package storage
import (
+ "code.google.com/p/weed-fs/go/util"
"errors"
"fmt"
"io"
"os"
- "code.google.com/p/weed-fs/go/util"
)
const (
diff --git a/go/storage/store.go b/go/storage/store.go
index 857a62506..142ee84e2 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -1,12 +1,12 @@
package storage
import (
+ "code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/url"
- "code.google.com/p/weed-fs/go/util"
"strconv"
"strings"
)
@@ -175,7 +175,9 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
size, err = v.write(n)
if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
- s.Join()
+ if err = s.Join(); err != nil {
+ log.Printf("error with Join: %s", err)
+ }
}
return
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index aeb1126d7..b9c7484d7 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -86,7 +86,7 @@ func (v *Volume) Close() {
v.accessLock.Lock()
defer v.accessLock.Unlock()
v.nm.Close()
- v.dataFile.Close()
+ _ = v.dataFile.Close()
}
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
@@ -101,7 +101,9 @@ func (v *Volume) maybeWriteSuperBlock() error {
return e
}
func (v *Volume) readSuperBlock() (err error) {
- v.dataFile.Seek(0, 0)
+ if _, err = v.dataFile.Seek(0, 0); err != nil {
+ return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err)
+ }
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e)
@@ -128,7 +130,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
return
}
if size, err = n.Append(v.dataFile, v.Version()); err != nil {
- v.dataFile.Truncate(offset)
+ if e := v.dataFile.Truncate(offset); e != nil {
+ err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e)
+ }
return
}
nv, ok := v.nm.Get(n.Id)
@@ -143,9 +147,14 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok {
- v.nm.Delete(n.Id)
- v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
- _, err := n.Append(v.dataFile, v.Version())
+ var err error
+ if err = v.nm.Delete(n.Id); err != nil {
+ return nv.Size, err
+ }
+ if _, err = v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0); err != nil {
+ return nv.Size, err
+ }
+ _, err = n.Append(v.dataFile, v.Version())
return nv.Size, err
}
return 0, nil
@@ -156,7 +165,9 @@ func (v *Volume) read(n *Needle) (int, error) {
defer v.accessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
if ok && nv.Offset > 0 {
- v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
+ if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
+ return -1, err
+ }
return n.Read(v.dataFile, nv.Size, v.Version())
}
return -1, errors.New("Not Found")
@@ -176,7 +187,7 @@ func (v *Volume) compact() error {
func (v *Volume) commitCompact() error {
v.accessLock.Lock()
defer v.accessLock.Unlock()
- v.dataFile.Close()
+ _ = v.dataFile.Close()
var e error
if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
return e
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index ba37f0d5f..ea4ea5d39 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -1,8 +1,8 @@
package topology
import (
- _ "fmt"
"code.google.com/p/weed-fs/go/storage"
+ _ "fmt"
"strconv"
)
diff --git a/go/topology/node.go b/go/topology/node.go
index 90826dfae..786f76702 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -1,8 +1,8 @@
package topology
import (
- "fmt"
"code.google.com/p/weed-fs/go/storage"
+ "fmt"
)
type NodeId string
diff --git a/go/topology/node_list.go b/go/topology/node_list.go
index 293f534ea..db7723714 100644
--- a/go/topology/node_list.go
+++ b/go/topology/node_list.go
@@ -1,9 +1,9 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/storage"
"fmt"
"math/rand"
- "code.google.com/p/weed-fs/go/storage"
)
type NodeList struct {
diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go
index 4cd2ebaa1..c6e530724 100644
--- a/go/topology/node_list_test.go
+++ b/go/topology/node_list_test.go
@@ -7,7 +7,11 @@ import (
)
func TestXYZ(t *testing.T) {
- topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ topo, err := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ if err != nil {
+ t.Error("cannot create new topology:", err)
+ t.FailNow()
+ }
for i := 0; i < 5; i++ {
dc := NewDataCenter("dc" + strconv.Itoa(i))
dc.activeVolumeCount = i
@@ -16,22 +20,22 @@ func TestXYZ(t *testing.T) {
}
nl := NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(1)
+ picked, ret := nl.RandomlyPickN(1, 0)
if !ret || len(picked) != 1 {
t.Error("need to randomly pick 1 node")
}
- picked, ret = nl.RandomlyPickN(4)
+ picked, ret = nl.RandomlyPickN(4, 0)
if !ret || len(picked) != 4 {
t.Error("need to randomly pick 4 nodes")
}
- picked, ret = nl.RandomlyPickN(5)
+ picked, ret = nl.RandomlyPickN(5, 0)
if !ret || len(picked) != 5 {
t.Error("need to randomly pick 5 nodes")
}
- picked, ret = nl.RandomlyPickN(6)
+ picked, ret = nl.RandomlyPickN(6, 0)
if ret || len(picked) != 0 {
t.Error("can not randomly pick 6 nodes:", ret, picked)
}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index f8af79b21..99e570821 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -1,72 +1,72 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/storage"
"encoding/json"
"fmt"
"math/rand"
- "code.google.com/p/weed-fs/go/storage"
"testing"
"time"
)
var topologyLayout = `
{
- "dc1":{
- "rack1":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server2":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server1":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server2":{
- "volumes":[],
- "limit":4
- },
- "server3":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":2
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
+ "dc1":{
+ "rack1":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server2":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server2":{
+ "volumes":[],
+ "limit":4
+ },
+ "server3":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":2
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
}
`
@@ -78,7 +78,10 @@ func setup(topologyLayout string) *Topology {
}
//need to connect all nodes first before server adding volumes
- topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ topo, err := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
@@ -94,7 +97,10 @@ func setup(topologyLayout string) *Topology {
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
+ vi := storage.VolumeInfo{
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 70a1ad268..74dc1cd09 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -1,12 +1,12 @@
package topology
import (
- "errors"
- "io/ioutil"
- "math/rand"
"code.google.com/p/weed-fs/go/directory"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
+ "errors"
+ "io/ioutil"
+ "math/rand"
)
type Topology struct {
@@ -28,7 +28,7 @@ type Topology struct {
configuration *Configuration
}
-func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
+func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@@ -44,9 +44,9 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
- t.loadConfiguration(confFile)
+ err := t.loadConfiguration(confFile)
- return t
+ return t, err
}
func (t *Topology) loadConfiguration(configurationFile string) error {
diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go
index 9c9abde4f..7215edc4e 100644
--- a/go/topology/topology_compact.go
+++ b/go/topology/topology_compact.go
@@ -1,12 +1,12 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
"net/url"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
"time"
)
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 9093bf884..fd2fe3bef 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -1,9 +1,9 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/storage"
"fmt"
"math/rand"
- "code.google.com/p/weed-fs/go/storage"
"time"
)
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index b6e6e8bfe..f5c2e2360 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -1,10 +1,10 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/storage"
"errors"
"fmt"
"math/rand"
- "code.google.com/p/weed-fs/go/storage"
)
type VolumeLayout struct {
diff --git a/go/weed/export.go b/go/weed/export.go
index 6b391024a..9e65a4de3 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -3,12 +3,12 @@ package main
import (
"archive/tar"
"bytes"
+ "code.google.com/p/weed-fs/go/directory"
+ "code.google.com/p/weed-fs/go/storage"
"fmt"
"log"
"os"
"path"
- "code.google.com/p/weed-fs/go/directory"
- "code.google.com/p/weed-fs/go/storage"
"strconv"
"strings"
"text/template"
diff --git a/go/weed/fix.go b/go/weed/fix.go
index 249007252..597bc0ef9 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -1,10 +1,10 @@
package main
import (
+ "code.google.com/p/weed-fs/go/storage"
"log"
"os"
"path"
- "code.google.com/p/weed-fs/go/storage"
"strconv"
)
@@ -52,7 +52,7 @@ func runFix(cmd *Command, args []string) bool {
debug("saved", count, "with error", pe)
} else {
debug("skipping deleted file ...")
- nm.Delete(n.Id)
+ return nm.Delete(n.Id)
}
return nil
})
diff --git a/go/weed/master.go b/go/weed/master.go
index 3d8757c16..f6cc88df0 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -1,13 +1,13 @@
package main
import (
+ "code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
"encoding/json"
"errors"
"log"
"net/http"
- "code.google.com/p/weed-fs/go/replication"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
"runtime"
"strconv"
"strings"
@@ -57,14 +57,14 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
for _, dn := range machines {
ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
}
- writeJson(w, r, map[string]interface{}{"locations": ret})
+ writeJsonQuiet(w, r, map[string]interface{}{"locations": ret})
} else {
w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
}
} else {
w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
+ writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid})
}
}
@@ -80,24 +80,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return
}
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
if topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "No free volumes left!"})
+ writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
return
} else {
- vg.GrowByType(rt, topo)
+ if _, err = vg.GrowByType(rt, topo); err != nil {
+ writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
+ return
+ }
}
}
fid, count, dn, err := topo.PickForWrite(rt, c)
if err == nil {
- writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
+ writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
}
}
@@ -112,19 +115,22 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
- json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
+ if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil {
+ writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()})
+ return
+ }
debug(s, "volumes", r.FormValue("volumes"))
topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = VERSION
m["Topology"] = topo.ToMap()
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
@@ -153,10 +159,10 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
}
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()})
} else {
w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]interface{}{"count": count})
+ writeJsonQuiet(w, r, map[string]interface{}{"count": count})
}
}
@@ -164,7 +170,7 @@ func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = VERSION
m["Volumes"] = topo.ToVolumeMap()
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
func redirectHandler(w http.ResponseWriter, r *http.Request) {
@@ -179,7 +185,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
}
}
@@ -188,7 +194,11 @@ func runMaster(cmd *Command, args []string) bool {
*mMaxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*mMaxCpu)
- topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
+ var e error
+ if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed",
+ uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil {
+ log.Fatalf("cannot create topology:%s", e)
+ }
vg = replication.NewDefaultVolumeGrowth()
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
http.HandleFunc("/dir/assign", dirAssignHandler)
@@ -209,9 +219,9 @@ func runMaster(cmd *Command, args []string) bool {
Handler: http.DefaultServeMux,
ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
}
- e := srv.ListenAndServe()
+ e = srv.ListenAndServe()
if e != nil {
- log.Fatalf("Fail to start:%s", e.Error())
+ log.Fatalf("Fail to start:%s", e)
}
return true
}
diff --git a/go/weed/shell.go b/go/weed/shell.go
index daf0b7e1f..4287f2148 100644
--- a/go/weed/shell.go
+++ b/go/weed/shell.go
@@ -3,6 +3,7 @@ package main
import (
"bufio"
"fmt"
+ "log"
"os"
)
@@ -25,8 +26,13 @@ func runShell(command *Command, args []string) bool {
o := bufio.NewWriter(os.Stdout)
e := bufio.NewWriter(os.Stderr)
prompt := func() {
- o.WriteString("> ")
- o.Flush()
+ var err error
+ if _, err = o.WriteString("> "); err != nil {
+ log.Printf("error writing to stdout: %s", err)
+ }
+ if err = o.Flush(); err != nil {
+ log.Printf("error flushing stdout: %s", err)
+ }
}
readLine := func() string {
ret, err := r.ReadString('\n')
@@ -38,7 +44,9 @@ func runShell(command *Command, args []string) bool {
}
execCmd := func(cmd string) int {
if cmd != "" {
- o.WriteString(cmd)
+ if _, err := o.WriteString(cmd); err != nil {
+ log.Printf("error writing to stdout: %s", err)
+ }
}
return 0
}
diff --git a/go/weed/upload.go b/go/weed/upload.go
index 92478b7b6..a47551ddf 100644
--- a/go/weed/upload.go
+++ b/go/weed/upload.go
@@ -1,14 +1,14 @@
package main
import (
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
"strconv"
)
diff --git a/go/weed/volume.go b/go/weed/volume.go
index edf2ad821..fd2298541 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -2,13 +2,13 @@ package main
import (
"bytes"
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/storage"
"log"
"math/rand"
"mime"
"net/http"
"os"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/storage"
"runtime"
"strconv"
"strings"
@@ -48,41 +48,41 @@ func statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = VERSION
m["Volumes"] = store.Status()
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
if err == nil {
- writeJson(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {
- writeJson(w, r, map[string]string{"error": err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
}
debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
if err == nil {
- writeJson(w, r, map[string]interface{}{"error": "", "result": ret})
+ writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
} else {
- writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false})
+ writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
}
debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
}
func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
err := store.CompactVolume(r.FormValue("volume"))
if err == nil {
- writeJson(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {
- writeJson(w, r, map[string]string{"error": err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
}
debug("compacted volume =", r.FormValue("volume"), ", error =", err)
}
func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
err := store.CommitCompactVolume(r.FormValue("volume"))
if err == nil {
- writeJson(w, r, map[string]interface{}{"error": ""})
+ writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
} else {
- writeJson(w, r, map[string]string{"error": err.Error()})
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
}
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
}
@@ -163,18 +163,29 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
}
}
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
- w.Write(n.Data)
+ if _, e = w.Write(n.Data); e != nil {
+ debug("response write error:", e)
+ }
}
func PostHandler(w http.ResponseWriter, r *http.Request) {
- r.ParseForm()
+ if e := r.ParseForm(); e != nil {
+ debug("form parse error:", e)
+ writeJsonQuiet(w, r, e)
+ return
+ }
vid, _, _ := parseURLPath(r.URL.Path)
volumeId, e := storage.NewVolumeId(vid)
if e != nil {
- writeJson(w, r, e)
+ debug("NewVolumeId error:", e)
+ writeJsonQuiet(w, r, e)
+ return
+ }
+ if e != nil {
+ writeJsonQuiet(w, r, e)
} else {
needle, filename, ne := storage.NewNeedle(r)
if ne != nil {
- writeJson(w, r, ne)
+ writeJsonQuiet(w, r, ne)
} else {
ret, err := store.Write(volumeId, needle)
errorStatus := ""
@@ -204,15 +215,19 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
if errorStatus == "" {
w.WriteHeader(http.StatusCreated)
} else {
- store.Delete(volumeId, needle)
- distributedOperation(volumeId, func(location operation.Location) bool {
- return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
- })
+ if _, e = store.Delete(volumeId, needle); e != nil {
+ errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
+ strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error()
+ } else {
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ })
+ }
w.WriteHeader(http.StatusInternalServerError)
m["error"] = errorStatus
}
m["size"] = ret
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
}
}
@@ -230,7 +245,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
return
}
@@ -268,7 +283,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]uint32)
m["size"] = uint32(count)
- writeJson(w, r, m)
+ writeJsonQuiet(w, r, m)
}
func parseURLPath(path string) (vid, fid, ext string) {
diff --git a/go/weed/weed.go b/go/weed/weed.go
index c03cb68ac..e97c8b550 100644
--- a/go/weed/weed.go
+++ b/go/weed/weed.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
+ "log"
"math/rand"
"net/http"
"os"
@@ -173,22 +174,40 @@ func exitIfErrors() {
exit()
}
}
-func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
+
+func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) {
w.Header().Set("Content-Type", "application/javascript")
var bytes []byte
if r.FormValue("pretty") != "" {
- bytes, _ = json.MarshalIndent(obj, "", " ")
+ bytes, err = json.MarshalIndent(obj, "", " ")
} else {
- bytes, _ = json.Marshal(obj)
+ bytes, err = json.Marshal(obj)
+ }
+ if err != nil {
+ return
}
callback := r.FormValue("callback")
if callback == "" {
- w.Write(bytes)
+ _, err = w.Write(bytes)
} else {
- w.Write([]uint8(callback))
- w.Write([]uint8("("))
+ if _, err = w.Write([]uint8(callback)); err != nil {
+ return
+ }
+ if _, err = w.Write([]uint8("(")); err != nil {
+ return
+ }
fmt.Fprint(w, string(bytes))
- w.Write([]uint8(")"))
+ if _, err = w.Write([]uint8(")")); err != nil {
+ return
+ }
+ }
+ return
+}
+
+// wrapper for writeJson - just logs errors
+func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) {
+ if err := writeJson(w, r, obj); err != nil {
+ log.Printf("error writing JSON %s: %s", obj, err)
}
}