diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /unmaintained | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'unmaintained')
| -rw-r--r-- | unmaintained/change_superblock/change_superblock.go | 2 | ||||
| -rw-r--r-- | unmaintained/check_disk_size/check_disk_size.go | 42 | ||||
| -rw-r--r-- | unmaintained/compact_leveldb/compact_leveldb.go | 4 | ||||
| -rw-r--r-- | unmaintained/diff_volume_servers/diff_volume_servers.go | 196 | ||||
| -rw-r--r-- | unmaintained/fix_dat/fix_dat.go | 4 | ||||
| -rw-r--r-- | unmaintained/repeated_vacuum/repeated_vacuum.go | 62 | ||||
| -rwxr-xr-x | unmaintained/s3/benchmark/hsbench.sh | 3 | ||||
| -rw-r--r-- | unmaintained/s3/presigned_put/presigned_put.go | 73 | ||||
| -rw-r--r-- | unmaintained/see_dat/see_dat.go | 4 | ||||
| -rw-r--r-- | unmaintained/see_idx/see_idx.go | 5 | ||||
| -rw-r--r-- | unmaintained/see_log_entry/see_log_entry.go | 75 | ||||
| -rw-r--r-- | unmaintained/see_meta/see_meta.go | 6 | ||||
| -rw-r--r-- | unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go | 2 | ||||
| -rw-r--r-- | unmaintained/stress_filer_upload/write_files/write_files.go | 54 | ||||
| -rw-r--r-- | unmaintained/volume_tailer/volume_tailer.go | 11 |
15 files changed, 507 insertions, 36 deletions
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index afe651c4e..56342a0cb 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -92,7 +92,7 @@ func main() { header := superBlock.Bytes() - if n, e := datFile.WriteAt(header, 0); n == 0 || e != nil { + if n, e := datBackend.WriteAt(header, 0); n == 0 || e != nil { glog.Fatalf("cannot write super block: %v", e) } diff --git a/unmaintained/check_disk_size/check_disk_size.go b/unmaintained/check_disk_size/check_disk_size.go new file mode 100644 index 000000000..4a8b92b88 --- /dev/null +++ b/unmaintained/check_disk_size/check_disk_size.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "fmt" + "runtime" + "syscall" +) + +var ( + dir = flag.String("dir", ".", "the directory which uses a disk") +) + +func main() { + flag.Parse() + + fillInDiskStatus(*dir) + + fmt.Printf("OS: %v\n", runtime.GOOS) + fmt.Printf("Arch: %v\n", runtime.GOARCH) + +} + +func fillInDiskStatus(dir string) { + fs := syscall.Statfs_t{} + err := syscall.Statfs(dir, &fs) + if err != nil { + fmt.Printf("failed to statfs on %s: %v\n", dir, err) + return + } + fmt.Printf("statfs: %+v\n", fs) + fmt.Println() + + total := fs.Blocks * uint64(fs.Bsize) + free := fs.Bfree * uint64(fs.Bsize) + fmt.Printf("Total: %d blocks x %d block size = %d bytes\n", fs.Blocks, uint64(fs.Bsize), total) + fmt.Printf("Free : %d blocks x %d block size = %d bytes\n", fs.Bfree, uint64(fs.Bsize), free) + fmt.Printf("Used : %d blocks x %d block size = %d bytes\n", fs.Blocks-fs.Bfree, uint64(fs.Bsize), total-free) + fmt.Printf("Free Percentage : %.2f%%\n", float32((float64(free)/float64(total))*100)) + fmt.Printf("Used Percentage : %.2f%%\n", float32((float64(total-free)/float64(total))*100)) + return +} diff --git a/unmaintained/compact_leveldb/compact_leveldb.go b/unmaintained/compact_leveldb/compact_leveldb.go index 317356c3f..9be5697de 100644 --- a/unmaintained/compact_leveldb/compact_leveldb.go +++ b/unmaintained/compact_leveldb/compact_leveldb.go @@ -5,6 +5,7 @@ import ( "log" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -25,6 +26,9 @@ func main() { } db, err := leveldb.OpenFile(*dir, opts) + if errors.IsCorrupted(err) { + db, err = leveldb.RecoverFile(*dir, opts) + } if err != nil { log.Fatal(err) } diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go new file mode 100644 index 000000000..27a537617 --- /dev/null +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -0,0 +1,196 @@ +package main + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "math" + "os" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +var ( + serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against") + volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers") + volumeCollection = flag.String("collection", "", "the volume collection name") + grpcDialOption grpc.DialOption +) + +/* + Diff the volume's files across multiple volume servers. + diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 + + Example Output: + reference 127.0.0.1:8081 + fileId volumeServer message + 5,01617c3f61 127.0.0.1:8080 wrongSize +*/ +func main() { + flag.Parse() + + util.LoadConfiguration("security", false) + grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + vid := uint32(*volumeId) + servers := strings.Split(*serversStr, ",") + if len(servers) < 2 { + glog.Fatalf("You must specify more than 1 server\n") + } + var referenceServer string + var maxOffset int64 + allFiles := map[string]map[types.NeedleId]needleState{} + for _, addr := range servers { + files, offset, err := getVolumeFiles(vid, addr) + if err != nil { + glog.Fatalf("Failed to copy idx from volume server %s\n", err) + } + allFiles[addr] = files + if offset > maxOffset { + referenceServer = addr + } + } + + same := true + fmt.Println("reference", referenceServer) + fmt.Println("fileId volumeServer message") + for nid, n := range allFiles[referenceServer] { + for addr, files := range allFiles { + if addr == referenceServer { + continue + } + var diffMsg string + n2, ok := files[nid] + if !ok { + if n.state == stateDeleted { + continue + } + diffMsg = "missing" + } else if n2.state != n.state { + switch n.state { + case stateDeleted: + diffMsg = "notDeleted" + case statePresent: + diffMsg = "deleted" + } + } else if n2.size != n.size { + diffMsg = "wrongSize" + } else { + continue + } + same = false + + // fetch the needle details + var id string + var err error + if n.state == statePresent { + id, err = getNeedleFileId(vid, nid, referenceServer) + } else { + id, err = getNeedleFileId(vid, nid, addr) + } + if err != nil { + glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err) + } + fmt.Println(id, addr, diffMsg) + } + } + if !same { + os.Exit(1) + } +} + +const ( + stateDeleted uint8 = 1 + statePresent uint8 = 2 +) + +type needleState struct { + state uint8 + size types.Size +} + +func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { + var idxFile *bytes.Reader + err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: v, + Ext: ".idx", + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: *volumeCollection, + }) + if err != nil { + return err + } + var buf bytes.Buffer + for { + resp, err := copyFileClient.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + buf.Write(resp.FileContent) + } + idxFile = bytes.NewReader(buf.Bytes()) + return nil + }) + if err != nil { + return nil, 0, err + } + + var maxOffset int64 + files := map[types.NeedleId]needleState{} + err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { + if offset.IsZero() || size.IsDeleted() { + files[key] = needleState{ + state: stateDeleted, + size: size, + } + } else { + files[key] = needleState{ + state: statePresent, + size: size, + } + } + if actual := offset.ToActualOffset(); actual > maxOffset { + maxOffset = actual + } + return nil + }) + if err != nil { + return nil, 0, err + } + return files, maxOffset, nil +} + +func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) { + var id string + err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ + VolumeId: v, + NeedleId: uint64(nid), + }) + if err != nil { + return err + } + id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String() + return nil + }) + return id, err +} diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index d6110d870..70bce3bf9 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -98,7 +98,7 @@ func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, vis // parse index file entry key := util.BytesToUint64(bytes[0:8]) offsetFromIndex := util.BytesToUint32(bytes[8:12]) - sizeFromIndex := util.BytesToUint32(bytes[12:16]) + sizeFromIndex := types.BytesToSize(bytes[12:16]) count, _ = idxFile.ReadAt(bytes, readerOffset) readerOffset += int64(count) @@ -123,7 +123,7 @@ func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, vis } }() - if n.Size <= n.DataSize { + if n.Size <= types.Size(n.DataSize) { continue } visitNeedle(n, offset) diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 28bcabb9b..bff5becc1 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -1,51 +1,73 @@ package main import ( - "bytes" "flag" "fmt" "log" "math/rand" + "time" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) var ( - master = flag.String("master", "127.0.0.1:9333", "the master server") - repeat = flag.Int("n", 5, "repeat how many times") + master = flag.String("master", "127.0.0.1:9333", "the master server") + repeat = flag.Int("n", 5, "repeat how many times") + garbageThreshold = flag.Float64("garbageThreshold", 0.3, "garbageThreshold") + replication = flag.String("replication", "", "replication 000, 001, 002, etc") ) func main() { flag.Parse() util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - for i := 0; i < *repeat; i++ { - assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{Count: 1}) - if err != nil { - log.Fatalf("assign: %v", err) - } + genFile(grpcDialOption, 0) - data := make([]byte, 1024) - rand.Read(data) - reader := bytes.NewReader(data) + go func() { + for { + println("vacuum threshold", *garbageThreshold) + _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold)) + if err != nil { + log.Fatalf("vacuum: %v", err) + } + time.Sleep(time.Second) + } + }() - targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + for i := 0; i < *repeat; i++ { + // create 2 files, and delete one of them - _, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, assignResult.Auth) - if err != nil { - log.Fatalf("upload: %v", err) - } + assignResult, targetUrl := genFile(grpcDialOption, i) util.Delete(targetUrl, string(assignResult.Auth)) - util.Get(fmt.Sprintf("http://%s/vol/vacuum", *master)) + } + +} +func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) { + assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *replication, + }) + if err != nil { + log.Fatalf("assign: %v", err) } + data := make([]byte, 1024) + rand.Read(data) + + targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + + _, err = operation.UploadData(targetUrl, fmt.Sprintf("test%d", i), false, data, false, "bench/test", nil, assignResult.Auth) + if err != nil { + log.Fatalf("upload: %v", err) + } + return assignResult, targetUrl } diff --git a/unmaintained/s3/benchmark/hsbench.sh b/unmaintained/s3/benchmark/hsbench.sh new file mode 100755 index 000000000..285b51405 --- /dev/null +++ b/unmaintained/s3/benchmark/hsbench.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +hsbench -a accesstoken -s secret -z 4K -d 10 -t 10 -b 10 -u http://localhost:8333 -m "cxipgdx" -bp "hsbench-" diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go new file mode 100644 index 000000000..e8368d124 --- /dev/null +++ b/unmaintained/s3/presigned_put/presigned_put.go @@ -0,0 +1,73 @@ +package main + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "encoding/base64" + "fmt" + "crypto/md5" + "strings" + "time" + "net/http" +) + +// Downloads an item from an S3 Bucket in the region configured in the shared config +// or AWS_REGION environment variable. +// +// Usage: +// go run presigned_put.go +// For this exampl to work, the domainName is needd +// weed s3 -domainName=localhost +func main() { + h := md5.New() + content := strings.NewReader(stringContent) + content.WriteTo(h) + + // Initialize a session in us-west-2 that the SDK will use to load + // credentials from the shared credentials file ~/.aws/credentials. + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + }) + + // Create S3 service client + svc := s3.New(sess) + + putRequest, output := svc.PutObjectRequest(&s3.PutObjectInput{ + Bucket: aws.String("dev"), + Key: aws.String("testKey"), + }) + fmt.Printf("output: %+v\n", output) + + md5s := base64.StdEncoding.EncodeToString(h.Sum(nil)) + putRequest.HTTPRequest.Header.Set("Content-MD5", md5s) + + url, err := putRequest.Presign(15 * time.Minute) + if err != nil { + fmt.Println("error presigning request", err) + return + } + + fmt.Println(url) + + req, err := http.NewRequest("PUT", url, strings.NewReader(stringContent)) + req.Header.Set("Content-MD5", md5s) + if err != nil { + fmt.Println("error creating request", url) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("error put request: %v\n", err) + return + } + fmt.Printf("response: %+v\n", resp) +} + +var stringContent = `Generate a Pre-Signed URL for an Amazon S3 PUT Operation with a Specific Payload +You can generate a pre-signed URL for a PUT operation that checks whether users upload the correct content. When the SDK pre-signs a request, it computes the checksum of the request body and generates an MD5 checksum that is included in the pre-signed URL. Users must upload the same content that produces the same MD5 checksum generated by the SDK; otherwise, the operation fails. This is not the Content-MD5, but the signature. To enforce Content-MD5, simply add the header to the request. + +The following example adds a Body field to generate a pre-signed PUT operation that requires a specific payload to be uploaded by users. +`
\ No newline at end of file diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index efc58e751..17c494841 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -2,6 +2,7 @@ package main import ( "flag" + "github.com/chrislusf/seaweedfs/weed/util" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -31,7 +32,8 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) - glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t) + glog.V(0).Infof("%d,%s%x offset %d size %d(%s) cookie %x appendedAt %v", + *volumeId, n.Id, n.Cookie, offset, n.Size, util.BytesToHumanReadable(uint64(n.Size)), n.Cookie, t) return nil } diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go index 777af1821..22c659351 100644 --- a/unmaintained/see_idx/see_idx.go +++ b/unmaintained/see_idx/see_idx.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "strconv" @@ -35,8 +36,8 @@ func main() { } defer indexFile.Close() - idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { - fmt.Printf("key:%v offset:%v size:%v\n", key, offset, size) + idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { + fmt.Printf("key:%v offset:%v size:%v(%v)\n", key, offset, size, util.BytesToHumanReadable(uint64(size))) return nil }) diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go new file mode 100644 index 000000000..45480d4dc --- /dev/null +++ b/unmaintained/see_log_entry/see_log_entry.go @@ -0,0 +1,75 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "os" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + logdataFile = flag.String("logdata", "", "log data file saved under "+ filer.SystemLogDir) +) + +func main() { + flag.Parse() + + dst, err := os.OpenFile(*logdataFile, os.O_RDONLY, 0644) + if err != nil { + log.Fatalf("failed to open %s: %v", *logdataFile, err) + } + defer dst.Close() + + err = walkLogEntryFile(dst) + if err != nil { + log.Fatalf("failed to visit %s: %v", *logdataFile, err) + } + +} + +func walkLogEntryFile(dst *os.File) error { + + sizeBuf := make([]byte, 4) + + for { + if n, err := dst.Read(sizeBuf); n != 4 { + if err == io.EOF { + return nil + } + return err + } + + size := util.BytesToUint32(sizeBuf) + + data := make([]byte, int(size)) + + if n, err := dst.Read(data); n != len(data) { + return err + } + + logEntry := &filer_pb.LogEntry{} + err := proto.Unmarshal(data, logEntry) + if err != nil { + log.Printf("unexpected unmarshal filer_pb.LogEntry: %v", err) + return nil + } + + event := &filer_pb.SubscribeMetadataResponse{} + err = proto.Unmarshal(logEntry.Data, event) + if err != nil { + log.Printf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return nil + } + + fmt.Printf("event: %+v\n", event) + + } + +} diff --git a/unmaintained/see_meta/see_meta.go b/unmaintained/see_meta/see_meta.go index 0d2ac8de1..452badfd6 100644 --- a/unmaintained/see_meta/see_meta.go +++ b/unmaintained/see_meta/see_meta.go @@ -7,10 +7,10 @@ import ( "log" "os" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) var ( @@ -58,7 +58,7 @@ func walkMetaFile(dst *os.File) error { return err } - fmt.Fprintf(os.Stdout, "file %s %v\n", filer2.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String()) + fmt.Fprintf(os.Stdout, "file %s %v\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String()) for i, chunk := range fullEntry.Entry.Chunks { fmt.Fprintf(os.Stdout, " chunk %d %v\n", i+1, chunk.String()) } diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go index b2e4b28c6..2ee8028f2 100644 --- a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go +++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go @@ -45,7 +45,7 @@ func main() { defer wg.Done() client := &http.Client{Transport: &http.Transport{ - MaxConnsPerHost: 1024, + MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x))) diff --git a/unmaintained/stress_filer_upload/write_files/write_files.go b/unmaintained/stress_filer_upload/write_files/write_files.go new file mode 100644 index 000000000..508e37d14 --- /dev/null +++ b/unmaintained/stress_filer_upload/write_files/write_files.go @@ -0,0 +1,54 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "os" + "time" +) + +var ( + minSize = flag.Int("minSize", 1024, "min file size") + maxSize = flag.Int("maxSize", 3*1024*1024, "max file size") + fileCount = flag.Int("n", 1, "number of files to write") + blockSize = flag.Int("blockSizeKB", 4, "write block size") + toDir = flag.String("dir", ".", "destination directory") +) + +func check(e error) { + if e != nil { + panic(e) + } +} + +func main() { + + flag.Parse() + + block := make([]byte, *blockSize*1024) + + for i := 0; i < *fileCount; i++ { + + f, err := os.Create(fmt.Sprintf("%s/file%05d", *toDir, i)) + check(err) + + fileSize := *minSize + rand.Intn(*maxSize-*minSize) + startTime := time.Now() + + fmt.Printf("write %s %d bytes: ", f.Name(), fileSize) + + for x := 0; x < fileSize; { + rand.Read(block) + _, err = f.Write(block) + check(err) + x += len(block) + } + + err = f.Close() + check(err) + + fmt.Printf("%.02f MB/sec\n", float64(fileSize)*float64(time.Second)/float64(time.Now().Sub(startTime)*1024*1024)) + } + +} diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index f0ef51c09..32da2e6ab 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" util2 "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" "golang.org/x/tools/godoc/util" ) @@ -25,7 +24,7 @@ func main() { flag.Parse() util2.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client") vid := needle.VolumeId(*volumeId) @@ -38,7 +37,7 @@ func main() { sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() } - err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { + err := operation.TailVolume(func()string{return *master}, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { if n.Size == 0 { println("-", n.String()) return nil @@ -49,8 +48,8 @@ func main() { if *showTextFile { data := n.Data - if n.IsGzipped() { - if data, err = util2.UnGzipData(data); err != nil { + if n.IsCompressed() { + if data, err = util2.DecompressData(data); err != nil { return err } } @@ -58,7 +57,7 @@ func main() { println(string(data)) } - println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data)) + println("-", n.String(), "compressed", n.IsCompressed(), "original size", len(data)) } return nil }) |
