diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_server.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 11 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 11 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_server_tcp_handlers_write.go | 137 |
5 files changed, 150 insertions, 18 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 22474a5e2..9e0770afa 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -153,7 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { for _, master := range fs.option.Masters { - _, err := pb.ParseFilerGrpcAddress(master) + _, err := pb.ParseServerToGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index f77462adb..892a732f7 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - if len(entry.Chunks) == 0 && len(entry.Content) == 0 { - glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) - stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc() - w.WriteHeader(http.StatusNoContent) - return - } - w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) // mime type mimeType := entry.Attr.Mime @@ -164,6 +156,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) + if err != nil { + glog.Errorf("failed to write entry content: %v", err) + } return err } return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d3ce7e605..318399281 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -38,10 +38,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * chunkSize := 1024 * 1024 * maxMB - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + stats.FilerRequestCounter.WithLabelValues("chunk").Inc() start := time.Now() defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds()) }() var reply *FilerPostResult @@ -302,13 +302,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { - stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() + stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() start := time.Now() defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) + stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) }() uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + if uploadResult != nil && uploadResult.RetryCount > 0 { + stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) + } return uploadResult, err, data } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 29aff5c0b..156afd4a1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if !ms.Topo.HasWritableVolume(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for "+option.String()) + return nil, fmt.Errorf("no free volumes left for " + option.String()) } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { @@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType)) stats := volumeLayout.Stats() - - totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 - resp := &master_pb.StatisticsResponse{ - TotalSize: uint64(totalSize), + TotalSize: stats.TotalSize, UsedSize: stats.UsedSize, FileCount: stats.FileCount, } diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go new file mode 100644 index 000000000..a009611da --- /dev/null +++ b/weed/server/volume_server_tcp_handlers_write.go @@ -0,0 +1,137 @@ +package weed_server + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "net" + "strings" +) + +func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { + defer c.Close() + + glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) + + bufReader := bufio.NewReaderSize(c, 1024*1024) + bufWriter := bufio.NewWriterSize(c, 1024*1024) + + for { + cmd, err := bufReader.ReadString('\n') + if err != nil { + if err != io.EOF { + glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + } + return + } + cmd = cmd[:len(cmd)-1] + switch cmd[0] { + case '+': + fileId := cmd[1:] + err = vs.handleTcpPut(fileId, bufReader) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '-': + fileId := cmd[1:] + err = vs.handleTcpDelete(fileId) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '?': + fileId := cmd[1:] + err = vs.handleTcpGet(fileId, bufWriter) + case '!': + bufWriter.Flush() + } + + } + +} + +func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + err = volume.StreamRead(n, writer) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + sizeBuf := make([]byte, 4) + if _, err = bufReader.Read(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + err = volume.StreamWrite(n, bufReader, dataSize) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + _, err = vs.store.DeleteVolumeNeedle(volumeId, n) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { + + commaIndex := strings.LastIndex(fileId, ",") + if commaIndex <= 0 { + return 0, nil, fmt.Errorf("unknown fileId %s", fileId) + } + + vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] + + volumeId, ve := needle.NewVolumeId(vid) + if ve != nil { + return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) + } + + n := new(needle.Needle) + n.ParsePath(fid) + return volumeId, n, nil +} |
