aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
-rw-r--r--weed/mq/client/pub_client/scheduler.go10
1 files changed, 5 insertions, 5 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index a768fa7f8..40e8014c6 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -137,7 +137,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
stream, err := brokerClient.PublishMessage(context.Background())
if err != nil {
- return fmt.Errorf("create publish client: %v", err)
+ return fmt.Errorf("create publish client: %w", err)
}
publishClient := &PublishClient{
SeaweedMessaging_PublishMessageClient: stream,
@@ -154,12 +154,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
},
},
}); err != nil {
- return fmt.Errorf("send init message: %v", err)
+ return fmt.Errorf("send init message: %w", err)
}
// process the hello message
resp, err := stream.Recv()
if err != nil {
- return fmt.Errorf("recv init response: %v", err)
+ return fmt.Errorf("recv init response: %w", err)
}
if resp.Error != "" {
return fmt.Errorf("init response error: %v", resp.Error)
@@ -208,7 +208,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Data: data,
},
}); err != nil {
- return fmt.Errorf("send publish data: %v", err)
+ return fmt.Errorf("send publish data: %w", err)
}
publishCounter++
atomic.StoreInt64(&publishedTsNs, data.TsNs)
@@ -218,7 +218,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
} else {
// CloseSend would cancel the context on the server side
if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
+ return fmt.Errorf("close send: %w", err)
}
}