diff options
| author | joeslay <54322500+joeslay@users.noreply.github.com> | 2019-10-14 16:03:40 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-10-14 16:03:40 +0100 |
| commit | d53aee179b6624b981caa7e702425ec260808be9 (patch) | |
| tree | 0e641a775fe2ba398283282b9f9a109986e41f58 /weed/server | |
| parent | baa813ee3012d52a4b861cb61a2ef87f94e5b127 (diff) | |
| parent | 50e885da45a58180eeb0098e5446d252181964fc (diff) | |
| download | seaweedfs-d53aee179b6624b981caa7e702425ec260808be9.tar.xz seaweedfs-d53aee179b6624b981caa7e702425ec260808be9.zip | |
Merge pull request #10 from chrislusf/master
merge seaweed master
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_query.go | 69 |
2 files changed, 69 insertions, 4 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 8b39146ee..711a3ebad 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -257,7 +257,3 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return nil } - -func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) { - -} diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go new file mode 100644 index 000000000..767e28e7b --- /dev/null +++ b/weed/server/volume_grpc_query.go @@ -0,0 +1,69 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/query/json" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/tidwall/gjson" +) + +func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error { + + for _, fid := range req.FromFileIds { + + vid, id_cookie, err := operation.ParseFileId(fid) + if err != nil { + glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err) + return err + } + + n := new(needle.Needle) + volumeId, _ := needle.NewVolumeId(vid) + n.ParsePath(id_cookie) + + cookie := n.Cookie + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) + return err + } + + if n.Cookie != cookie { + glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err) + return err + } + + if req.InputSerialization.CsvInput != nil { + + } + + if req.InputSerialization.JsonInput != nil { + + stripe := &volume_server_pb.QueriedStripe{ + Records: nil, + } + + filter := json.Query{ + Field: req.Filter.Field, + Op: req.Filter.Operand, + Value: req.Filter.Value, + } + gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool { + passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter) + if !passedFilter { + return true + } + stripe.Records = json.ToJson(stripe.Records, req.Selections, values) + return true + }) + err = stream.Send(stripe) + if err != nil { + return err + } + } + + } + + return nil +} |
