diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 117 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 64 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_admin.go | 5 | ||||
| -rw-r--r-- | weed/server/volume_server_ui/volume.html | 14 |
4 files changed, 181 insertions, 19 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 0e733fc0a..84a9035ca 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -402,3 +402,120 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return nil } + +// ReceiveFile receives a file stream from client and writes it to storage +func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error { + var fileInfo *volume_server_pb.ReceiveFileInfo + var targetFile *os.File + var filePath string + var bytesWritten uint64 + + defer func() { + if targetFile != nil { + targetFile.Close() + } + }() + + for { + req, err := stream.Recv() + if err == io.EOF { + // Stream completed successfully + if targetFile != nil { + targetFile.Sync() + glog.V(1).Infof("Successfully received file %s (%d bytes)", filePath, bytesWritten) + } + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + BytesWritten: bytesWritten, + }) + } + if err != nil { + // Clean up on error + if targetFile != nil { + targetFile.Close() + os.Remove(filePath) + } + glog.Errorf("Failed to receive stream: %v", err) + return fmt.Errorf("failed to receive stream: %v", err) + } + + switch data := req.Data.(type) { + case *volume_server_pb.ReceiveFileRequest_Info: + // First message contains file info + fileInfo = data.Info + glog.V(1).Infof("ReceiveFile: volume %d, ext %s, collection %s, shard %d, size %d", + fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize) + + // Create file path based on file info + if fileInfo.IsEcVolume { + // Find storage location for EC shard + var targetLocation *storage.DiskLocation + for _, location := range vs.store.Locations { + if location.DiskType == types.HardDriveType { + targetLocation = location + break + } + } + if targetLocation == nil && len(vs.store.Locations) > 0 { + targetLocation = vs.store.Locations[0] // Fall back to first available location + } + if targetLocation == nil { + glog.Errorf("ReceiveFile: no storage location available") + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: "no storage location available", + }) + } + + // Create EC shard file path + baseFileName := erasure_coding.EcShardBaseFileName(fileInfo.Collection, int(fileInfo.VolumeId)) + filePath = util.Join(targetLocation.Directory, baseFileName+fileInfo.Ext) + } else { + // Regular volume file + v := vs.store.GetVolume(needle.VolumeId(fileInfo.VolumeId)) + if v == nil { + glog.Errorf("ReceiveFile: volume %d not found", fileInfo.VolumeId) + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: fmt.Sprintf("volume %d not found", fileInfo.VolumeId), + }) + } + filePath = v.FileName(fileInfo.Ext) + } + + // Create target file + targetFile, err = os.Create(filePath) + if err != nil { + glog.Errorf("ReceiveFile: failed to create file %s: %v", filePath, err) + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: fmt.Sprintf("failed to create file: %v", err), + }) + } + glog.V(1).Infof("ReceiveFile: created target file %s", filePath) + + case *volume_server_pb.ReceiveFileRequest_FileContent: + // Subsequent messages contain file content + if targetFile == nil { + glog.Errorf("ReceiveFile: file info must be sent first") + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: "file info must be sent first", + }) + } + + n, err := targetFile.Write(data.FileContent) + if err != nil { + targetFile.Close() + os.Remove(filePath) + glog.Errorf("ReceiveFile: failed to write to file %s: %v", filePath, err) + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: fmt.Sprintf("failed to write file: %v", err), + }) + } + bytesWritten += uint64(n) + glog.V(2).Infof("ReceiveFile: wrote %d bytes to %s (total: %d)", n, filePath, bytesWritten) + + default: + glog.Errorf("ReceiveFile: unknown message type") + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: "unknown message type", + }) + } + } +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 23cc29e0d..5981c5efe 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -141,20 +141,31 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv glog.V(0).Infof("VolumeEcShardsCopy: %v", req) var location *storage.DiskLocation - if req.CopyEcxFile { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { - return location.DiskType == types.HardDriveType - }) + + // Use disk_id if provided (disk-aware storage) + if req.DiskId > 0 || (req.DiskId == 0 && len(vs.store.Locations) > 0) { + // Validate disk ID is within bounds + if int(req.DiskId) >= len(vs.store.Locations) { + return nil, fmt.Errorf("invalid disk_id %d: only have %d disks", req.DiskId, len(vs.store.Locations)) + } + + // Use the specific disk location + location = vs.store.Locations[req.DiskId] + glog.V(1).Infof("Using disk %d for EC shard copy: %s", req.DiskId, location.Directory) } else { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { - //(location.FindEcVolume) This method is error, will cause location is nil, redundant judgment - // _, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)) - // return found - return true - }) - } - if location == nil { - return nil, fmt.Errorf("no space left") + // Fallback to old behavior for backward compatibility + if req.CopyEcxFile { + location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { + return location.DiskType == types.HardDriveType + }) + } else { + location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { + return true + }) + } + if location == nil { + return nil, fmt.Errorf("no space left") + } } dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) @@ -467,3 +478,30 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil } + +func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) { + glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId) + + var ecShardInfos []*volume_server_pb.EcShardInfo + + // Find the EC volume + for _, location := range vs.store.Locations { + if v, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found { + // Get shard details from the EC volume + shardDetails := v.ShardDetails() + for _, shardDetail := range shardDetails { + ecShardInfo := &volume_server_pb.EcShardInfo{ + ShardId: uint32(shardDetail.ShardId), + Size: shardDetail.Size, + Collection: v.Collection, + } + ecShardInfos = append(ecShardInfos, ecShardInfo) + } + break + } + } + + return &volume_server_pb.VolumeEcShardsInfoResponse{ + EcShardInfos: ecShardInfos, + }, nil +} diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 27797add3..ec6490662 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,11 +1,12 @@ package weed_server import ( - "github.com/seaweedfs/seaweedfs/weed/topology" - "github.com/seaweedfs/seaweedfs/weed/util/version" "net/http" "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/topology" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/stats" ) diff --git a/weed/server/volume_server_ui/volume.html b/weed/server/volume_server_ui/volume.html index 565d14368..605eb52f0 100644 --- a/weed/server/volume_server_ui/volume.html +++ b/weed/server/volume_server_ui/volume.html @@ -175,8 +175,8 @@ <tr> <th>Id</th> <th>Collection</th> - <th>Shard Size</th> - <th>Shards</th> + <th>Total Size</th> + <th>Shard Details</th> <th>CreatedAt</th> </tr> </thead> @@ -185,8 +185,14 @@ <tr> <td><code>{{ .VolumeId }}</code></td> <td>{{ .Collection }}</td> - <td>{{ bytesToHumanReadable .ShardSize }}</td> - <td>{{ .ShardIdList }}</td> + <td>{{ bytesToHumanReadable .Size }}</td> + <td> + {{ range .ShardDetails }} + <span class="label label-info" style="margin-right: 5px;"> + {{ .ShardId }}: {{ bytesToHumanReadable .Size }} + </span> + {{ end }} + </td> <td>{{ .CreatedAt.Format "2006-01-02 15:04" }}</td> </tr> {{ end }} |
