aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go2
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go2
-rw-r--r--weed/mq/broker/broker_grpc_configure.go1
-rw-r--r--weed/mq/broker/broker_grpc_pub.go5
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go9
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go2
-rw-r--r--weed/mq/client/cmd/weed_pub_kv/publisher_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_pub_record/publisher_record.go2
-rw-r--r--weed/mq/client/pub_client/publish.go2
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go2
-rw-r--r--weed/mq/pub_balancer/broker_stats.go6
-rw-r--r--weed/mq/schema/schema.go6
-rw-r--r--weed/mq/schema/schema_builder.go6
-rw-r--r--weed/mq/schema/schema_test.go40
-rw-r--r--weed/mq/schema/struct_to_schema_test.go4
-rw-r--r--weed/mq/schema/to_parquet_levels.go16
-rw-r--r--weed/mq/schema/to_parquet_levels_test.go10
-rw-r--r--weed/mq/schema/to_parquet_schema.go1
-rw-r--r--weed/mq/schema/to_parquet_value.go2
-rw-r--r--weed/mq/schema/to_schema_value.go16
-rw-r--r--weed/mq/schema/write_parquet_test.go8
-rw-r--r--weed/mq/topic/local_partition.go16
-rw-r--r--weed/shell/command_volume_balance_test.go2
-rw-r--r--weed/topology/volume_growth.go19
-rw-r--r--weed/topology/volume_layout.go2
-rw-r--r--weed/util/log_buffer/log_buffer_test.go4
26 files changed, 92 insertions, 95 deletions
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index ddb192b9b..b5c8f9ebd 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -67,7 +67,7 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs {
- println("read new data1", t.TsNs - tsNs, "ns")
+ println("read new data1", t.TsNs-tsNs, "ns")
}
}
}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index 35e9ee682..350821757 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -137,7 +137,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs {
- println("read new data2", t.TsNs - tsNs, "ns")
+ println("read new data2", t.TsNs-tsNs, "ns")
}
}
}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 40ac8df23..afb01a886 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -35,7 +35,6 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
}
-
t := topic.FromPbTopic(request.Topic)
var readErr, assignErr error
resp, readErr = b.readTopicConfFromFiler(t)
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index a217489de..d633a3efa 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -69,7 +69,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
- var receivedSequence, acknowledgedSequence int64
+ var receivedSequence, acknowledgedSequence int64
var isClosed bool
// start sending ack to publisher
@@ -85,7 +85,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
lastAckTime := time.Now()
for !isClosed {
receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
- if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
+ if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence,
@@ -101,7 +101,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}()
-
// process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index d8100f021..8995b0cc2 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -13,10 +13,11 @@ import (
)
type memBuffer struct {
- buf []byte
- startTime time.Time
- stopTime time.Time
+ buf []byte
+ startTime time.Time
+ stopTime time.Time
}
+
func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
var req *mq_pb.PublishFollowMeRequest
req, err = stream.Recv()
@@ -84,7 +85,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
}
}
-
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
logBuffer.ShutdownLogBuffer()
@@ -97,7 +97,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
-
// flush the remaining messages
inMemoryBuffers.CloseInput()
for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index 50470f879..cb55e2032 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -45,7 +45,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
b.accessLock.Lock()
defer b.accessLock.Unlock()
p := topic.FromPbPartition(partition)
- if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil {
+ if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
index 59842c21f..096b355a1 100644
--- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
+++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
@@ -16,7 +16,7 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
- clientName = flag.String("client", "c1", "client name")
+ clientName = flag.String("client", "c1", "client name")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t")
diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
index 4d4e5fc4d..a5fbd455e 100644
--- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go
+++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
@@ -19,7 +19,7 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
- clientName = flag.String("client", "c1", "client name")
+ clientName = flag.String("client", "c1", "client name")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t")
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 0c162d6a0..a25620de1 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -48,7 +48,7 @@ func (p *TopicPublisher) FinishPublish() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers {
inputBuffer.Enqueue(&mq_pb.DataMessage{
- TsNs: time.Now().UnixNano(),
+ TsNs: time.Now().UnixNano(),
Ctrl: &mq_pb.ControlMessage{
IsClose: true,
},
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index b0b533e42..2f1330b5e 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -122,7 +122,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
- Filter: sub.ContentConfig.Filter,
+ Filter: sub.ContentConfig.Filter,
FollowerBrokers: assigned.FollowerBrokers,
},
},
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index 00f1f80ca..c579c275e 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -17,7 +17,7 @@ type BrokerStats struct {
}
type TopicPartitionStats struct {
topic.TopicPartition
- PublisherCount int32
+ PublisherCount int32
SubscriberCount int32
}
@@ -48,7 +48,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
- PublisherCount: topicPartitionStats.PublisherCount,
+ PublisherCount: topicPartitionStats.PublisherCount,
SubscriberCount: topicPartitionStats.SubscriberCount,
}
publisherCount += topicPartitionStats.PublisherCount
@@ -76,7 +76,7 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
UnixTimeNs: partition.UnixTimeNs,
},
},
- PublisherCount: 0,
+ PublisherCount: 0,
SubscriberCount: 0,
}
key := tps.TopicPartition.String()
diff --git a/weed/mq/schema/schema.go b/weed/mq/schema/schema.go
index 1f99dda5b..5fadf2cc2 100644
--- a/weed/mq/schema/schema.go
+++ b/weed/mq/schema/schema.go
@@ -6,17 +6,17 @@ import (
type Schema struct {
RecordType *schema_pb.RecordType
- fieldMap map[string]*schema_pb.Field
+ fieldMap map[string]*schema_pb.Field
}
func NewSchema(recordType *schema_pb.RecordType) (*Schema, error) {
- fieldMap := make( map[string]*schema_pb.Field)
+ fieldMap := make(map[string]*schema_pb.Field)
for _, field := range recordType.Fields {
fieldMap[field.Name] = field
}
return &Schema{
RecordType: recordType,
- fieldMap: fieldMap,
+ fieldMap: fieldMap,
}, nil
}
diff --git a/weed/mq/schema/schema_builder.go b/weed/mq/schema/schema_builder.go
index c64f8bb7c..db89ce34c 100644
--- a/weed/mq/schema/schema_builder.go
+++ b/weed/mq/schema/schema_builder.go
@@ -8,9 +8,9 @@ import (
var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
- TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
- TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
- TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
+ TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
+ TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
+ TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
)
diff --git a/weed/mq/schema/schema_test.go b/weed/mq/schema/schema_test.go
index 5398beb02..f7dc8ff55 100644
--- a/weed/mq/schema/schema_test.go
+++ b/weed/mq/schema/schema_test.go
@@ -32,10 +32,10 @@ func TestEnumScalarType(t *testing.T) {
func TestField(t *testing.T) {
field := &Field{
- Name: "field_name",
- Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
- FieldIndex: 1,
- IsRepeated: false,
+ Name: "field_name",
+ Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
+ FieldIndex: 1,
+ IsRepeated: false,
}
assert.NotNil(t, field)
}
@@ -44,32 +44,32 @@ func TestRecordType(t *testing.T) {
subRecord := &RecordType{
Fields: []*Field{
{
- Name: "field_1",
- Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
- FieldIndex: 1,
- IsRepeated: false,
+ Name: "field_1",
+ Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
+ FieldIndex: 1,
+ IsRepeated: false,
},
{
- Name: "field_2",
- Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
- FieldIndex: 2,
- IsRepeated: false,
+ Name: "field_2",
+ Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
+ FieldIndex: 2,
+ IsRepeated: false,
},
},
}
record := &RecordType{
Fields: []*Field{
{
- Name: "field_key",
- Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
- FieldIndex: 1,
- IsRepeated: false,
+ Name: "field_key",
+ Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
+ FieldIndex: 1,
+ IsRepeated: false,
},
{
- Name: "field_record",
- Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
- FieldIndex: 2,
- IsRepeated: false,
+ Name: "field_record",
+ Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
+ FieldIndex: 2,
+ IsRepeated: false,
},
},
}
diff --git a/weed/mq/schema/struct_to_schema_test.go b/weed/mq/schema/struct_to_schema_test.go
index d22939aef..fae27ecef 100644
--- a/weed/mq/schema/struct_to_schema_test.go
+++ b/weed/mq/schema/struct_to_schema_test.go
@@ -76,7 +76,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin().
WithField("Field3", TypeString).
WithField("Field4", TypeInt32).
- RecordTypeEnd(),
+ RecordTypeEnd(),
).
RecordTypeEnd(),
},
@@ -104,7 +104,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin().
WithField("Field6", TypeString).
WithField("Field7", TypeBytes).
- RecordTypeEnd(),
+ RecordTypeEnd(),
).RecordTypeEnd(),
).
RecordTypeEnd(),
diff --git a/weed/mq/schema/to_parquet_levels.go b/weed/mq/schema/to_parquet_levels.go
index 6c73563cd..f9fc073bb 100644
--- a/weed/mq/schema/to_parquet_levels.go
+++ b/weed/mq/schema/to_parquet_levels.go
@@ -7,9 +7,9 @@ import (
type ParquetLevels struct {
startColumnIndex int
- endColumnIndex int
- definitionDepth int
- levels map[string]*ParquetLevels
+ endColumnIndex int
+ definitionDepth int
+ levels map[string]*ParquetLevels
}
func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
@@ -19,7 +19,7 @@ func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
- return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
+ return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_RecordType:
return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_ListType:
@@ -35,15 +35,15 @@ func toFieldTypeListLevels(listType *schema_pb.ListType, startColumnIndex, defin
func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
return &ParquetLevels{
startColumnIndex: startColumnIndex,
- endColumnIndex: startColumnIndex + 1,
- definitionDepth: definitionDepth,
+ endColumnIndex: startColumnIndex + 1,
+ definitionDepth: definitionDepth,
}, nil
}
func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
recordTypeLevels := &ParquetLevels{
startColumnIndex: startColumnIndex,
- definitionDepth: definitionDepth,
- levels: make(map[string]*ParquetLevels),
+ definitionDepth: definitionDepth,
+ levels: make(map[string]*ParquetLevels),
}
for _, field := range recordType.Fields {
fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1)
diff --git a/weed/mq/schema/to_parquet_levels_test.go b/weed/mq/schema/to_parquet_levels_test.go
index 150bfb146..5200c0e02 100644
--- a/weed/mq/schema/to_parquet_levels_test.go
+++ b/weed/mq/schema/to_parquet_levels_test.go
@@ -11,9 +11,9 @@ func TestToParquetLevels(t *testing.T) {
recordType *schema_pb.RecordType
}
tests := []struct {
- name string
- args args
- want *ParquetLevels
+ name string
+ args args
+ want *ParquetLevels
}{
{
name: "nested type",
@@ -25,13 +25,13 @@ func TestToParquetLevels(t *testing.T) {
RecordTypeBegin().
WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)).
- RecordTypeEnd()).
+ RecordTypeEnd()).
WithField("Company", TypeString).
WithRecordField("Address",
RecordTypeBegin().
WithField("Street", TypeString).
WithField("City", TypeString).
- RecordTypeEnd()).
+ RecordTypeEnd()).
RecordTypeEnd(),
},
want: &ParquetLevels{
diff --git a/weed/mq/schema/to_parquet_schema.go b/weed/mq/schema/to_parquet_schema.go
index 077305948..196546a32 100644
--- a/weed/mq/schema/to_parquet_schema.go
+++ b/weed/mq/schema/to_parquet_schema.go
@@ -31,7 +31,6 @@ func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err e
return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
}
-
return dataType, err
}
diff --git a/weed/mq/schema/to_parquet_value.go b/weed/mq/schema/to_parquet_value.go
index 22a93de67..83740495b 100644
--- a/weed/mq/schema/to_parquet_value.go
+++ b/weed/mq/schema/to_parquet_value.go
@@ -70,7 +70,7 @@ func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *
return
}
-func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
+func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
switch value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return parquet.BooleanValue(value.GetBoolValue()), nil
diff --git a/weed/mq/schema/to_schema_value.go b/weed/mq/schema/to_schema_value.go
index 93103cd1a..947a84310 100644
--- a/weed/mq/schema/to_schema_value.go
+++ b/weed/mq/schema/to_schema_value.go
@@ -47,7 +47,7 @@ func toRecordValue(recordType *schema_pb.RecordType, levels *ParquetLevels, valu
func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) {
listValues := make([]*schema_pb.Value, 0)
var value *schema_pb.Value
- for ;valueIndex < len(values); {
+ for valueIndex < len(values) {
if values[valueIndex].Column() != levels.startColumnIndex {
break
}
@@ -67,19 +67,19 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
}
switch scalarType {
case schema_pb.ScalarType_BOOL:
- return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT32:
- return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT64:
- return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex + 1, nil
case schema_pb.ScalarType_FLOAT:
- return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex + 1, nil
case schema_pb.ScalarType_DOUBLE:
- return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
case schema_pb.ScalarType_BYTES:
- return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex + 1, nil
case schema_pb.ScalarType_STRING:
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex+1, nil
+ return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex + 1, nil
}
return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
}
diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go
index e5df8ecd9..5cfdec999 100644
--- a/weed/mq/schema/write_parquet_test.go
+++ b/weed/mq/schema/write_parquet_test.go
@@ -19,13 +19,13 @@ func TestWriteReadParquet(t *testing.T) {
RecordTypeBegin().
WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)).
- RecordTypeEnd()).
+ RecordTypeEnd()).
WithField("Company", TypeString).
WithRecordField("Address",
RecordTypeBegin().
WithField("Street", TypeString).
WithField("City", TypeString).
- RecordTypeEnd()).
+ RecordTypeEnd()).
RecordTypeEnd()
fmt.Printf("RecordType: %v\n", recordType)
@@ -85,9 +85,9 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
fmt.Sprintf("john_%d@c.com", i),
fmt.Sprintf("john_%d@d.com", i),
fmt.Sprintf("john_%d@e.com", i)).
- RecordEnd()).
+ RecordEnd()).
SetString("Company", fmt.Sprintf("company_%d", i)).
- RecordEnd()
+ RecordEnd()
AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
if count < 10 {
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 54c122a0f..72e78d606 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -16,17 +16,17 @@ import (
)
type LocalPartition struct {
- ListenersWaits int64
- AckTsNs int64
+ ListenersWaits int64
+ AckTsNs int64
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
Partition
- LogBuffer *log_buffer.LogBuffer
- Publishers *LocalPartitionPublishers
- Subscribers *LocalPartitionSubscribers
+ LogBuffer *log_buffer.LogBuffer
+ Publishers *LocalPartitionPublishers
+ Subscribers *LocalPartitionSubscribers
followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn
@@ -37,7 +37,7 @@ var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
lp := &LocalPartition{
- Partition: partition,
+ Partition: partition,
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
@@ -155,8 +155,8 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
- Topic: initMessage.Topic,
- Partition: initMessage.Partition,
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
},
},
}); err != nil {
diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go
index b8af4fb98..fb39e063f 100644
--- a/weed/shell/command_volume_balance_test.go
+++ b/weed/shell/command_volume_balance_test.go
@@ -278,7 +278,7 @@ func TestDeleteEmptySelection(t *testing.T) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
- if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
+ if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
fmt.Printf("empty volume %d from %s\n", v.Id, dn.Id)
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 9885fc2d1..44b8b6286 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -31,20 +31,20 @@ type VolumeGrowRequest struct {
}
type volumeGrowthStrategy struct {
- Copy1Count int
- Copy2Count int
- Copy3Count int
+ Copy1Count int
+ Copy2Count int
+ Copy3Count int
CopyOtherCount int
- Threshold float64
+ Threshold float64
}
var (
VolumeGrowStrategy = volumeGrowthStrategy{
- Copy1Count: 7,
- Copy2Count: 6,
- Copy3Count: 3,
+ Copy1Count: 7,
+ Copy2Count: 6,
+ Copy3Count: 3,
CopyOtherCount: 1,
- Threshold: 0.9,
+ Threshold: 0.9,
}
)
@@ -77,7 +77,8 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
// given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
switch copyCount {
- case 1: count = VolumeGrowStrategy.Copy1Count
+ case 1:
+ count = VolumeGrowStrategy.Copy1Count
case 2:
count = VolumeGrowStrategy.Copy2Count
case 3:
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 4467b2dc8..66f7118c9 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -381,7 +381,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, a
}
active++
info, _ := dn.GetVolumesById(v)
- if float64(info.Size) > float64(vl.volumeSizeLimit)* VolumeGrowStrategy.Threshold{
+ if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
crowded++
}
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index 067a02ef4..a4947a611 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -19,7 +19,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
}, nil, func() {
})
- startTime := MessagePosition{Time:time.Now()}
+ startTime := MessagePosition{Time: time.Now()}
messageSize := 1024
messageCount := 5000
@@ -38,7 +38,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
println("processed all messages")
return true, io.EOF
}
- return false,nil
+ return false, nil
})
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)