aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_read.go11
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go11
-rw-r--r--weed/server/master_grpc_server_volume.go7
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go137
5 files changed, 150 insertions, 18 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 22474a5e2..9e0770afa 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -153,7 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
- _, err := pb.ParseFilerGrpcAddress(master)
+ _, err := pb.ParseServerToGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index f77462adb..892a732f7 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
- glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
- stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
- w.WriteHeader(http.StatusNoContent)
- return
- }
-
w.Header().Set("Accept-Ranges", "bytes")
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
// mime type
mimeType := entry.Attr.Mime
@@ -164,6 +156,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
+ if err != nil {
+ glog.Errorf("failed to write entry content: %v", err)
+ }
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d3ce7e605..318399281 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -38,10 +38,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
chunkSize := 1024 * 1024 * maxMB
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
}()
var reply *FilerPostResult
@@ -302,13 +302,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
- stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
}()
uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ if uploadResult != nil && uploadResult.RetryCount > 0 {
+ stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
+ }
return uploadResult, err, data
}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 29aff5c0b..156afd4a1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for "+option.String())
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
-
- totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
-
resp := &master_pb.StatisticsResponse{
- TotalSize: uint64(totalSize),
+ TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..a009611da
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,137 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
+ return
+ }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}