So far, RocketMQ only support message filtering feature by
TAG, but one message only can own one tag, this is too limited to meet complex business requirements.
So, we want to define and implement a reasonable filter language based on a subset of the SQL 92 expression syntax to support customized message filtering.
Why Subset Of SQL92
Let RocketMQ has the ability of message filtering is the purpose of this issue, and as we know, SQL92 is used widely and most persons are familiar with it.It’s resonable to select it as RocketMQ’s grammar.
As I know, ActiveMQ already impllement this functionality based on JavaCC, it’s simple and exntensible.So I just extract it and integrate into RocketMQ, only some grammars:
- Numeric comparison, like
- Character comparison, like
IS NOT NULL;
Constant type are:
- Numeric, like 123, 3.1415;
- Character, like ‘abc’, must be maked with single quotes;
NULL, special constant;
- Broker collects the expression of consumer through heartbeat request, and saved in
- When consumer pulls messages, broker will construct a
MessageFilter(an interface) with compiled expression and subscription data to select matched messages in
The main logic is simple.
- New Module, rocketmq-filter
The implementation of SQL92 language is placed in this module which have dependency on common module.
Broker compile or evaluate expression through the interface of
FilterSpi contained in
FilterFactory that manage all
FilterSpi and also support new one to register.
- How to manage consumer’s expression data
Different from tag filtering, expression of SQL92 should be compiled first to check whether is leagal and then use the complied expression to compute. This procedure is designed to take place at broker.
ConsumerManager manage the suscriptions of push consumer, and
ConsumerFilterManager manage the expression info of push consumer who wish to filter message by special language, the info includes data version, expression, compiled expression, alive time and etc.
- How to filter message by expression
I redesign the interface
MessageStore by replace the last parameter
MessageFilter that is also refactored. The purpose is to make module
rocketmq-store has no relation with protocol.
When get message, the implementation
ExpressionMessageFilter would check whether the message is matched by
BitsArray which will be refered later or evaluation, just as the mechanism of tag filtering.
- Optimization, pre-calculate the filtering result when build consume queue
It’s poor performance to do filter when pull message:
- off-heap to heap, once every consumer subscribed same topic pull message.
- decode message properties, once every consumer subscribed same topic pull message.
BloomFilter and pre-calculation are adopted to optimize the situation:
- Every consumer has been asigned some bit position of
BloomFilterwhen register to broker.
- When broker build queue after message into
CommitLog, the consumer’s filtering result would be calculated, and all resuls are assembled as a
ConsumeQueueExtis a store file linked to
ConsumeQueuecould find the data by the
tagsCodewhitch is already replaced by the address(for compitable, the range is Long.MIN_VALUE to Integer.MIN_VALUE) generated by
ExpressionMessageFiltercould use the
BitsArrayto check whether the message is matched. Because of BloomFilter’s collision, it also need to decode properties to do calculation for matched message(may could be reduced by check the collision, not include in this edition).
This optimization is suitable for:
- High subscription ratio.
- Large properties.
This optimization is off default, it need set some configs when broker starting to switch on:
- enableCalcFilterBitMap = true, means to caculate bitmap when build consume queue.
- expectConsumerNumUseFilter = XX(Integer, default is 32), means estimated consumer num subscribe same topic.
- maxErrorRateOfBloomFilter = XX(1~100, default is 20), means error rate of bloom filter.
- enableConsumeQueueExt = true, means construct consume queue extend file.
Only push consumer could filter message by SQL92 expression in this edition, the interface is:
public void subscribe(final String topic, final MessageSelector messageSelector)
Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex dual network
Producer send message with 1k body and 1k properties.
Five consumers consume message through push model, every consumer would get 1/5 messages of total.
Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.