diff options
| author | Chris Lu <chris.lu@gmail.com> | 2024-02-04 10:44:01 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2024-02-04 10:44:01 -0800 |
| commit | 53612b770c029af3747fd2bc96f854fb3685aadb (patch) | |
| tree | 4395819cfefbf0fef0ccb1d3b942877764366cfa /weed | |
| parent | 0a12301b3d3eb560d8f50c459cb58e62aea7d753 (diff) | |
| parent | 56df44845f02d9c1f37e957df1b09fd1a6d9a7fd (diff) | |
| download | seaweedfs-53612b770c029af3747fd2bc96f854fb3685aadb.tar.xz seaweedfs-53612b770c029af3747fd2bc96f854fb3685aadb.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filer/filechunks_read.go | 4 | ||||
| -rw-r--r-- | weed/filer/filechunks_test.go | 15 | ||||
| -rw-r--r-- | weed/filer/stream.go | 89 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_postpolicy.go | 29 | ||||
| -rw-r--r-- | weed/server/common.go | 42 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 36 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 19 | ||||
| -rw-r--r-- | weed/shell/command_cluster_check.go | 29 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 32 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance_test.go | 2 | ||||
| -rw-r--r-- | weed/storage/needle_map/memdb.go | 30 | ||||
| -rw-r--r-- | weed/storage/volume_info/volume_info.go | 5 | ||||
| -rw-r--r-- | weed/topology/topology.go | 4 |
13 files changed, 201 insertions, 135 deletions
diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go index b8768ed63..11b297a3c 100644 --- a/weed/filer/filechunks_read.go +++ b/weed/filer/filechunks_read.go @@ -38,10 +38,10 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOff return int(a.ts - b.ts) } if a.isStart { - return -1 + return 1 } if b.isStart { - return 1 + return -1 } return 0 }) diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go index b448950a9..7554b0080 100644 --- a/weed/filer/filechunks_test.go +++ b/weed/filer/filechunks_test.go @@ -553,3 +553,18 @@ func TestViewFromVisibleIntervals3(t *testing.T) { } } + +func TestCompactFileChunks3(t *testing.T) { + chunks := []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", ModifiedTsNs: 50}, + {Offset: 100, Size: 100, FileId: "ghi", ModifiedTsNs: 50}, + {Offset: 200, Size: 100, FileId: "jkl", ModifiedTsNs: 100}, + {Offset: 300, Size: 100, FileId: "def", ModifiedTsNs: 200}, + } + + compacted, _ := CompactFileChunks(nil, chunks) + + if len(compacted) != 4 { + t.Fatalf("unexpected compacted: %d", len(compacted)) + } +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 51a82fb2e..2686fd833 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,13 +3,14 @@ package filer import ( "bytes" "fmt" - "golang.org/x/exp/slices" "io" "math" "strings" "sync" "time" + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -66,13 +67,14 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R return NewChunkStreamReader(filerClient, entry.GetChunks()) } -func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0) -} +type DoStreamContent func(writer io.Writer) error -func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error { +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +} - glog.V(4).Infof("start to stream content for chunks: %d", len(chunks)) +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { + glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -91,52 +93,61 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w } if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err + return nil, err } else if len(urlStrings) == 0 { errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) glog.Error(errUrlNotFound) - return errUrlNotFound + return nil, errUrlNotFound } fileId2Url[chunkView.FileId] = urlStrings } - downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) - remaining := size - for x := chunkViews.Front(); x != nil; x = x.Next { - chunkView := x.Value - if offset < chunkView.ViewOffset { - gap := chunkView.ViewOffset - offset - remaining -= gap - glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) - err := writeZero(writer, gap) + return func(writer io.Writer) error { + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) + remaining := size + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + } + offset = chunkView.ViewOffset + } + urlStrings := fileId2Url[chunkView.FileId] + start := time.Now() + err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) + stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + return fmt.Errorf("read chunk: %v", err) } - offset = chunkView.ViewOffset - } - urlStrings := fileId2Url[chunkView.FileId] - start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) - offset += int64(chunkView.ViewSize) - remaining -= int64(chunkView.ViewSize) - stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) - if err != nil { - stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() - return fmt.Errorf("read chunk: %v", err) + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) } - stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() - downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) - } - if remaining > 0 { - glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) - err := writeZero(writer, remaining) - if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } } - } - return nil + return nil + }, nil +} +func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { + streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + if err != nil { + return err + } + return streamFn(writer) } // ---------------- ReadAllReader ---------------------------------- diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index 8dd3900ed..1e7861129 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -39,7 +39,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } defer form.RemoveAll() - fileBody, fileName, fileSize, formValues, err := extractPostPolicyFormValues(form) + fileBody, fileName, fileContentType, fileSize, formValues, err := extractPostPolicyFormValues(form) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return @@ -115,6 +115,14 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) + // Get ContentType from post formData + // Otherwise from formFile ContentType + contentType := formValues.Get("Content-Type") + if contentType == "" { + contentType = fileContentType + } + r.Header.Set("Content-Type", contentType) + etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket) if errCode != s3err.ErrNone { @@ -152,9 +160,10 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } // Extract form fields and file data from a HTTP POST Policy -func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, fileName string, fileSize int64, formValues http.Header, err error) { +func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, fileName, fileContentType string, fileSize int64, formValues http.Header, err error) { // / HTML Form values fileName = "" + fileContentType = "" // Canonicalize the form values into http.Header. formValues = make(http.Header) @@ -164,7 +173,7 @@ func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, // Validate form values. if err = validateFormFieldSize(formValues); err != nil { - return nil, "", 0, nil, err + return nil, "", "", 0, nil, err } // this means that filename="" was not specified for file key and Go has @@ -177,7 +186,7 @@ func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, } fileSize = int64(b.Len()) filePart = io.NopCloser(b) - return filePart, fileName, fileSize, formValues, nil + return filePart, fileName, fileContentType, fileSize, formValues, nil } // Iterator until we find a valid File field and break @@ -185,32 +194,34 @@ func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, canonicalFormName := http.CanonicalHeaderKey(k) if canonicalFormName == "File" { if len(v) == 0 { - return nil, "", 0, nil, errors.New("Invalid arguments specified") + return nil, "", "", 0, nil, errors.New("Invalid arguments specified") } // Fetch fileHeader which has the uploaded file information fileHeader := v[0] // Set filename fileName = fileHeader.Filename + // Set contentType + fileContentType = fileHeader.Header.Get("Content-Type") // Open the uploaded part filePart, err = fileHeader.Open() if err != nil { - return nil, "", 0, nil, err + return nil, "", "", 0, nil, err } // Compute file size fileSize, err = filePart.(io.Seeker).Seek(0, 2) if err != nil { - return nil, "", 0, nil, err + return nil, "", "", 0, nil, err } // Reset Seek to the beginning _, err = filePart.(io.Seeker).Seek(0, 0) if err != nil { - return nil, "", 0, nil, err + return nil, "", "", 0, nil, err } // File found and ready for reading break } } - return filePart, fileName, fileSize, formValues, nil + return filePart, fileName, fileContentType, fileSize, formValues, nil } // Validate form field size for s3 specification requirement. diff --git a/weed/server/common.go b/weed/server/common.go index d88298402..a7d67fb2e 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "io" "io/fs" "mime/multipart" @@ -18,6 +17,9 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -282,7 +284,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) error { +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error { rangeReq := r.Header.Get("Range") bufferedWriter := writePool.Get().(*bufio.Writer) bufferedWriter.Reset(w) @@ -293,7 +295,13 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(bufferedWriter, 0, totalSize); err != nil { + writeFn, err := prepareWriteFn(0, totalSize) + if err != nil { + glog.Errorf("processRangeRequest: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest: %v", err) + } + if err = writeFn(bufferedWriter); err != nil { glog.Errorf("processRangeRequest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return fmt.Errorf("processRangeRequest: %v", err) @@ -335,8 +343,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) + writeFn, err := prepareWriteFn(ra.start, ra.length) + if err != nil { + glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest: %v", err) + } w.WriteHeader(http.StatusPartialContent) - err = writeFn(bufferedWriter, ra.start, ra.length) + err = writeFn(bufferedWriter) if err != nil { glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -346,11 +360,20 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 } // process multiple ranges - for _, ra := range ranges { + writeFnByRange := make(map[int](func(writer io.Writer) error)) + + for i, ra := range ranges { if ra.start > totalSize { http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) return fmt.Errorf("out of range: %v", err) } + writeFn, err := prepareWriteFn(ra.start, ra.length) + if err != nil { + glog.Errorf("processRangeRequest range[%d] err: %v", i, err) + http.Error(w, "Internal Error", http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err) + } + writeFnByRange[i] = writeFn } sendSize := rangesMIMESize(ranges, mimeType, totalSize) pr, pw := io.Pipe() @@ -359,13 +382,18 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 sendContent := pr defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. go func() { - for _, ra := range ranges { + for i, ra := range ranges { part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) if e != nil { pw.CloseWithError(e) return } - if e = writeFn(part, ra.start, ra.length); e != nil { + writeFn := writeFnByRange[i] + if writeFn == nil { + pw.CloseWithError(e) + return + } + if e = writeFn(part); e != nil { pw.CloseWithError(e) return } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 6bdd6a9dd..d1cd3beae 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -67,12 +67,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent ifModifiedSinceHeader := r.Header.Get("If-Modified-Since") if ifNoneMatchETagHeader != "" { if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) { + setEtag(w, etag) w.WriteHeader(http.StatusNotModified) return true } } else if ifModifiedSinceHeader != "" { if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil { if !t.Before(entry.Attr.Mtime) { + setEtag(w, etag) w.WriteHeader(http.StatusNotModified) return true } @@ -147,11 +149,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } - etag := filer.ETagEntry(entry) if checkPreconditions(w, r, entry) { return } + etag := filer.ETagEntry(entry) w.Header().Set("Accept-Ranges", "bytes") // mime type @@ -229,14 +231,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { if offset+size <= int64(len(entry.Content)) { - _, err := writer.Write(entry.Content[offset : offset+size]) - if err != nil { - stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() - glog.Errorf("failed to write entry content: %v", err) - } - return err + return func(writer io.Writer) error { + _, err := writer.Write(entry.Content[offset : offset+size]) + if err != nil { + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() + glog.Errorf("failed to write entry content: %v", err) + } + return err + }, nil } chunks := entry.GetChunks() if entry.IsInRemoteOnly() { @@ -247,17 +251,25 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) }); err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc() glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) - return fmt.Errorf("cache %s: %v", entry.FullPath, err) + return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err) } else { chunks = resp.Entry.GetChunks() } } - err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() - glog.Errorf("failed to stream content %s: %v", r.URL, err) + glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) + return nil, err } - return err + return func(writer io.Writer) error { + err := streamFn(writer) + if err != nil { + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() + glog.Errorf("failed to stream content %s: %v", r.URL, err) + } + return err + }, nil }) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 5f4beb77d..08e536811 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util/mem" @@ -382,12 +383,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re return nil } - return processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - if _, e = rs.Seek(offset, 0); e != nil { + return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + return func(writer io.Writer) error { + if _, e = rs.Seek(offset, 0); e != nil { + return e + } + _, e = io.CopyN(writer, rs, size) return e - } - _, e = io.CopyN(writer, rs, size) - return e + }, nil }) } @@ -409,8 +412,10 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str return } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + return func(writer io.Writer) error { + return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + }, nil }) } diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index 3fb72940f..03acca5b2 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -46,13 +46,13 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i } fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos)) - emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""] - hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"] - if !emptyDiskTypeFound && !hddDiskTypeFound { - return fmt.Errorf("Need to a hdd disk type!") + if len(topologyInfo.DiskInfos) == 0 { + return fmt.Errorf("no disk type defined") } - if emptyDiskTypeFound && emptyDiskTypeDiskInfo.MaxVolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.MaxVolumeCount == 0 { - return fmt.Errorf("Need to a hdd disk type!") + for diskType, diskInfo := range topologyInfo.DiskInfos { + if diskInfo.MaxVolumeCount == 0 { + return fmt.Errorf("no volume available for \"%s\" disk type", diskType) + } } // collect filers @@ -73,6 +73,19 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i } fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers) + if len(filers) > 0 { + genericDiskInfo, genericDiskInfoOk := topologyInfo.DiskInfos[""] + hddDiskInfo, hddDiskInfoOk := topologyInfo.DiskInfos["hdd"] + + if !genericDiskInfoOk && !hddDiskInfoOk { + return fmt.Errorf("filer metadata logs need generic or hdd disk type to be defined") + } + + if (genericDiskInfoOk && genericDiskInfo.MaxVolumeCount == 0) || (hddDiskInfoOk && hddDiskInfo.MaxVolumeCount == 0) { + return fmt.Errorf("filer metadata logs need generic or hdd volumes to be available") + } + } + // collect volume servers var volumeServers []pb.ServerAddress t, _, err := collectTopologyInfo(commandEnv, 0) @@ -90,9 +103,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i // collect all masters var masters []pb.ServerAddress - for _, master := range commandEnv.MasterClient.GetMasters() { - masters = append(masters, master) - } + masters = append(masters, commandEnv.MasterClient.GetMasters()...) // check from master to volume servers for _, master := range masters { diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 2284ceea6..cb201e064 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -1,6 +1,7 @@ package shell import ( + "cmp" "flag" "fmt" "io" @@ -79,7 +80,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } // collect topology information - topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 15*time.Second) + topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second) if err != nil { return err } @@ -94,16 +95,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } for _, c := range collections { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, c, *applyBalancing); err != nil { return err } } - } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { - return err - } } else { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, *collection, *applyBalancing); err != nil { return err } } @@ -111,10 +108,10 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error { for _, diskType := range diskTypes { - if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil { + if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, collection, applyBalancing); err != nil { return err } } @@ -122,7 +119,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo } -func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error { for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { @@ -164,7 +161,7 @@ func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskTy for _, dc := range t.DataCenterInfos { for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { - for diskType, _ := range dn.DiskInfos { + for diskType := range dn.DiskInfos { if _, found := knownTypes[diskType]; !found { knownTypes[diskType] = true } @@ -172,7 +169,7 @@ func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskTy } } } - for diskType, _ := range knownTypes { + for diskType := range knownTypes { diskTypes = append(diskTypes, types.ToDiskType(diskType)) } return @@ -244,7 +241,7 @@ func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) int { - return int(a.Size - b.Size) + return cmp.Compare(a.Size, b.Size) }) } @@ -270,7 +267,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu for hasMoved { hasMoved = false slices.SortFunc(nodesWithCapacity, func(a, b *Node) int { - return int(a.localVolumeRatio(capacityFunc) - b.localVolumeRatio(capacityFunc)) + return cmp.Compare(a.localVolumeRatio(capacityFunc), b.localVolumeRatio(capacityFunc)) }) if len(nodesWithCapacity) == 0 { fmt.Printf("no volume server found with capacity for %s", diskType.ReadableString()) @@ -278,7 +275,8 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu } var fullNode *Node - for fullNodeIndex := len(nodesWithCapacity) - 1; fullNodeIndex >= 0; fullNodeIndex-- { + var fullNodeIndex int + for fullNodeIndex = len(nodesWithCapacity) - 1; fullNodeIndex >= 0; fullNodeIndex-- { fullNode = nodesWithCapacity[fullNodeIndex] if !fullNode.isOneVolumeOnly() { break @@ -289,9 +287,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu candidateVolumes = append(candidateVolumes, v) } sortCandidatesFn(candidateVolumes) - - for i := 0; i < len(nodesWithCapacity)-1; i++ { - emptyNode := nodesWithCapacity[i] + for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { // no more volume servers with empty slots break diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 20c5abdf8..ce0aeb5ab 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -255,7 +255,7 @@ func TestBalance(t *testing.T) { volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) - if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, 30*1024*1024*1024, "ALL_COLLECTIONS", false); err != nil { + if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS", false); err != nil { t.Errorf("balance: %v", err) } diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index fb3d0130d..d3d47b605 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -1,11 +1,9 @@ package needle_map import ( - "bytes" "fmt" "io" "os" - "sort" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -36,6 +34,7 @@ func NewMemDb() *MemDb { } func (cm *MemDb) Set(key NeedleId, offset Offset, size Size) error { + bytes := ToBytes(key, offset, size) if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil { @@ -77,31 +76,6 @@ func doVisit(iter iterator.Iterator, visit func(NeedleValue) error) (ret error) return nil } -func (cm *MemDb) AscendingVisitByOffset(visit func(NeedleValue) error) (ret error) { - var needles []NeedleValue - err := cm.AscendingVisit(func(value NeedleValue) error { - needles = append(needles, value) - return nil - }) - if err != nil { - return err - } - sort.Slice(needles, func(i, j int) bool { - i_bytes := make([]byte, OffsetSize) - j_bytes := make([]byte, OffsetSize) - OffsetToBytes(i_bytes, needles[i].Offset) - OffsetToBytes(j_bytes, needles[j].Offset) - return bytes.Compare(i_bytes, j_bytes) < 0 - }) - for _, needle := range needles { - ret = visit(needle) - if ret != nil { - return ret - } - } - return nil -} - func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) { iter := cm.db.NewIterator(nil, nil) if iter.First() { @@ -148,7 +122,7 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) { idxFile.Close() }() - return cm.AscendingVisitByOffset(func(value NeedleValue) error { + return cm.AscendingVisit(func(value NeedleValue) error { if value.Offset.IsZero() || value.Size.IsDeleted() { return nil } diff --git a/weed/storage/volume_info/volume_info.go b/weed/storage/volume_info/volume_info.go index 2c6896830..59c08a833 100644 --- a/weed/storage/volume_info/volume_info.go +++ b/weed/storage/volume_info/volume_info.go @@ -76,9 +76,8 @@ func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) er return fmt.Errorf("failed to marshal %s: %v", fileName, marshalErr) } - writeErr := util.WriteFile(fileName, text, 0755) - if writeErr != nil { - return fmt.Errorf("failed to write %s: %v", fileName, writeErr) + if err := util.WriteFile(fileName, text, 0644); err != nil { + return fmt.Errorf("failed to write %s: %v", fileName, err) } return nil diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 03d7570c1..0a4cb4050 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" @@ -273,6 +274,9 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { glog.Infof("removing volume info: %+v from %v", v, dn.id) + if v.ReplicaPlacement.GetCopyCount() > 1 { + stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0) + } diskType := types.ToDiskType(v.DiskType) volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) |
