aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/cmd')
-rw-r--r--weed/mq/cmd/qsend/qsend.go62
1 files changed, 62 insertions, 0 deletions
diff --git a/weed/mq/cmd/qsend/qsend.go b/weed/mq/cmd/qsend/qsend.go
new file mode 100644
index 000000000..c80b220b8
--- /dev/null
+++ b/weed/mq/cmd/qsend/qsend.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/messages"
+ "os"
+ "time"
+)
+
+var (
+ master = flag.String("master", "localhost:9333", "master csv list")
+ topic = flag.String("topic", "", "topic name")
+)
+
+func main() {
+ flag.Parse()
+
+ publisher := client.NewPublisher(&client.PublisherOption{
+ Masters: *master,
+ Topic: *topic,
+ })
+
+ err := eachLineStdin(func(line string) error {
+ if len(line) > 0 {
+ if err := publisher.Publish(&messages.Message{
+ Key: nil,
+ Content: []byte(line),
+ Properties: nil,
+ Ts: time.Time{},
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+
+ publisher.Shutdown()
+
+ if err != nil {
+ fmt.Printf("error: %v\n", err)
+ }
+}
+
+func eachLineStdin(eachLineFn func(string) error) error {
+ scanner := bufio.NewScanner(os.Stdin)
+ for scanner.Scan() {
+ text := scanner.Text()
+ if err := eachLineFn(text); err != nil {
+ return err
+ }
+ }
+
+ // handle error
+ if scanner.Err() != nil {
+ return fmt.Errorf("scan stdin: %v", scanner.Err())
+ }
+
+ return nil
+}