aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client
AgeCommit message (Collapse)AuthorFilesLines
2025-07-16convert error fromating to %w everywhere (#6995)Chris Lu1-2/+2
2025-03-09Accumulated changes for message queue (#6600)Chris Lu4-28/+61
* rename * set agent address * refactor * add agent sub * pub messages * grpc new client * can publish records via agent * send init message with session id * fmt * check cancelled request while waiting * use sessionId * handle possible nil stream * subscriber process messages * separate debug port * use atomic int64 * less logs * minor * skip io.EOF * rename * remove unused * use saved offsets * do not reuse session, since always session id is new after restart remove last active ts from SessionEntry * simplify printing * purge unused * just proxy the subscription, skipping the session step * adjust offset types * subscribe offset type and possible value * start after the known tsns * avoid wrongly set startPosition * move * remove * refactor * typo * fix * fix changed path
2025-01-20Add message queue agent (#6463)Chris Lu3-55/+68
* scaffold message queue agent * adjust proto, add mq_agent * add agent client implementation * remove unused function * agent publish server implementation * adding agent
2024-11-04merge current message queue code changes (#6201)Chris Lu2-0/+13
* listing files to convert to parquet * write parquet files * save logs into parquet files * pass by value * compact logs into parquet format * can skip existing files * refactor * refactor * fix compilation * when no partition found * refactor * add untested parquet file read * rename package * refactor * rename files * remove unused * add merged log read func * parquet wants to know the file size * rewind by time * pass in stop ts * add stop ts * adjust log * minor * adjust log * skip .parquet files when reading message logs * skip non message files * Update subscriber_record.go * send messages * skip message data with only ts * skip non log files * update parquet-go package * ensure a valid record type * add new field to a record type * Update read_parquet_to_log.go * fix parquet file name generation * separating reading parquet and logs * add key field * add skipped logs * use in memory cache * refactor * refactor * refactor * refactor, and change compact log * refactor * rename * refactor * fix format * prefix v to version directory
2024-08-12fix buildchrislu1-1/+1
2024-05-30fixchrislu1-1/+0
2024-05-30stop partitionOffsetChan if closedchrislu1-0/+5
2024-05-29skip control messageschrislu1-0/+4
2024-05-27balance subscriberschrislu3-2/+10
need to ensure the offsets are persisted
2024-05-23go fmtchrislu2-8/+8
2024-05-23coordinator receives unassignment ackchrislu3-15/+33
2024-05-21client side stop partition subscribing if unassignedchrislu2-12/+27
2024-05-21fix compilationchrislu1-7/+2
2024-05-21refactorchrislu2-32/+37
ToDo 1. handle unassignment on client side 2. get notified of completed unassignment 3. in consumer_group.go, balance and send unassignment
2024-05-20refactorchrislu2-104/+112
2024-05-20remove ProcessorConfigurationchrislu3-11/+6
2024-05-20sending keyed offsetchrislu1-3/+12
2024-05-20minorchrislu1-7/+7
2024-05-20go fmtchrislu3-19/+18
2024-05-20track offsetchrislu3-4/+6
2024-05-19fixchrislu1-1/+1
2024-05-19ConcurrentPartitionLimitchrislu1-11/+19
2024-05-19start consuming ASAPchrislu1-1/+1
2024-05-19persist consumer group offsetchrislu2-5/+1
1. use one follower 2. read write consumer group offset
2024-05-16consumer acks received messageschrislu2-13/+32
2024-05-14subscriber receives partitions and dispatch to processorschrislu3-36/+94
2024-05-13consumer instance passing MaxPartitionCount to coordinatorchrislu2-1/+2
2024-05-10clean up unused variableschrislu1-3/+0
2024-03-24setup follower by publisherchrislu1-10/+11
* the subscriber would getOrGen a local partition and wait * the publisher would getOrGen a local partition. If localPartition follower is not setup, and init message has follower info, it would create followers based on init.Messages.
2024-02-05subscriber find broker leader firstChris Lu2-25/+34
2024-01-11adjust client side logschrislu1-10/+7
2024-01-08add batch index for each memory bufferchrislu1-1/+2
2024-01-05adjust logschrislu1-2/+2
2024-01-05refactorchrislu1-7/+8
2024-01-05rename functionschrislu1-2/+2
2024-01-05rename functionschrislu1-3/+3
2024-01-05rename functionschrislu1-4/+4
2024-01-03subscriber can be notified of the assignment change when topic is just ↵chrislu1-2/+2
configured Next: Subscriber needs to read by the timestamp offset.
2024-01-01log errorschrislu1-3/+6
2023-12-31adjust wait timechrislu1-5/+10
2023-12-31clean up dead codechrislu5-162/+75
2023-12-28passing broker into the assignmentschrislu1-2/+1
2023-12-28subscriber can get assignmentschrislu1-7/+7
2023-12-28subscriber keep connected to the balancerchrislu4-7/+109
2023-12-11Merge accumulated changes related to message queue (#5098)Chris Lu3-8/+33
* balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-10-02add subscriber coordinatorchrislu2-5/+12
2023-10-01refactorchrislu3-66/+98
2023-09-30adjust mq.protochrislu1-14/+14
2023-09-04api for subchrislu3-27/+45
2023-09-01can pub and subchrislu3-66/+105