Filter Messages By SQL92 In RocketMQ
So far, RocketMQ only support message filtering feature by TAG
, but one message only can own one tag, this is too limited to meet complex business requirements.
So, we want to define and implement a reasonable filter language based on a subset of the SQL 92 expression syntax to support customized message filtering.
Why Subset Of SQL92
Let RocketMQ has the ability of message filtering is the purpose of this issue, and as we know, SQL92 is used widely and most persons are familiar with it.It’s resonable to select it as RocketMQ’s grammar.
As I know, ActiveMQ already impllement this functionality based on JavaCC, it’s simple and exntensible.So I just extract it and integrate into RocketMQ, only some grammars:
- Numeric comparison, like
>
,>=
,<
,<=
,BETWEEN
,=
; - Character comparison, like
=
,<>
,IN
; IS NULL
orIS NOT NULL
;- Logical
AND
, logicalOR
, logicalNOT
;
Constant type are:
- Numeric, like 123, 3.1415;
- Character, like ‘abc’, must be maked with single quotes;
NULL
, special constant;- Boolean,
TRUE
orFALSE
;
Design
- Structure
- Broker collects the expression of consumer through heartbeat request, and saved in
ConsumerFilterManager
. - When consumer pulls messages, broker will construct a
MessageFilter
(an interface) with compiled expression and subscription data to select matched messages inCommitLog
.
The main logic is simple.
- New Module, rocketmq-filter
The implementation of SQL92 language is placed in this module which have dependency on common module.
Broker compile or evaluate expression through the interface of FilterSpi
contained in FilterFactory
that manage all FilterSpi
and also support new one to register.
- How to manage consumer’s expression data
Different from tag filtering, expression of SQL92 should be compiled first to check whether is leagal and then use the complied expression to compute. This procedure is designed to take place at broker.
ConsumerManager
manage the suscriptions of push consumer, and ConsumerFilterManager
manage the expression info of push consumer who wish to filter message by special language, the info includes data version, expression, compiled expression, alive time and etc.
- How to filter message by expression
I redesign the interface getMessage
of MessageStore
by replace the last parameter SubscriptionData
to MessageFilter
that is also refactored. The purpose is to make module rocketmq-store
has no relation with protocol.
When get message, the implementation ExpressionMessageFilter
would check whether the message is matched by BitsArray
which will be refered later or evaluation, just as the mechanism of tag filtering.
- Optimization, pre-calculate the filtering result when build consume queue
It’s poor performance to do filter when pull message:
- off-heap to heap, once every consumer subscribed same topic pull message.
- decode message properties, once every consumer subscribed same topic pull message.
BloomFilter
and pre-calculation are adopted to optimize the situation:
- Every consumer has been asigned some bit position of
BloomFilter
when register to broker. - When broker build queue after message into
CommitLog
, the consumer’s filtering result would be calculated, and all resuls are assembled as aBitsArray
saved inConsumeQueueExt
. ConsumeQueueExt
is a store file linked toConsumeQueue
,ConsumeQueue
could find the data by thetagsCode
whitch is already replaced by the address(for compitable, the range is Long.MIN_VALUE to Integer.MIN_VALUE) generated byConsumeQueueExt
.ExpressionMessageFilter
could use theBitsArray
to check whether the message is matched. Because of BloomFilter’s collision, it also need to decode properties to do calculation for matched message(may could be reduced by check the collision, not include in this edition).
This optimization is suitable for:
- High subscription ratio.
- Large properties.
This optimization is off default, it need set some configs when broker starting to switch on:
- enableCalcFilterBitMap = true, means to caculate bitmap when build consume queue.
- expectConsumerNumUseFilter = XX(Integer, default is 32), means estimated consumer num subscribe same topic.
- maxErrorRateOfBloomFilter = XX(1~100, default is 20), means error rate of bloom filter.
- enableConsumeQueueExt = true, means construct consume queue extend file.
Interface
Only push consumer could filter message by SQL92 expression in this edition, the interface is:
public void subscribe(final String topic, final MessageSelector messageSelector)
Performance Comparison
Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex dual network
Producer send message with 1k body and 1k properties.
Five consumers consume message through push model, every consumer would get 1/5 messages of total.
Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.
Leave a Comment