Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP 94: Message converter at broker level #11962

Closed
BewareMyPower opened this issue Sep 8, 2021 · 31 comments
Closed

PIP 94: Message converter at broker level #11962

BewareMyPower opened this issue Sep 8, 2021 · 31 comments
Assignees
Labels

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Sep 8, 2021

Motivation

The initial motivation was from Kafka's protocol handler, i.e. KoP (https:/streamnative/kop). KoP allows Kafka producer to configure entry format to kafka, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format.

This proposal tries to introduce a message converter, which is responsible to convert the buffer of an entry to the format that Pulsar consumer can recognize. Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary. We can configure multiple converters because we can configure multiple protocol handlers as well. Each protocol handler could write the entry with its own format.

The benefit is, after this change:

  • When other clients write an entry, no conversion is needed.
  • When other clients read an entry, no conversion is needed.
  • When a Pulsar consumer reads an entry, the conversion will be performed in broker.

Before this change, if we want to interact Pulsar consumer with other clients:

  • When other clients write an entry, we need to convert it to Pulsar format.
  • When other clients read an entry, we need to convert it from Pulsar format to the specific format.
  • When a Pulsar consumer reads an entry, no conversion is needed.

This proposal is mainly for protocol handlers because they can access PersistentTopic and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case.

Goal

This proposal's goal is only adding message converter at broker level. Once the related broker configs were enabled, the converters would be applied to all topics. An overhead would be brought to the topics which are only created for Pulsar clients. Because we need to check if the buffer needs to be converted. See MessageConverter#accept method in the next section.

In future, we can configure the message converters at namespace level or topic level. Even we can also configure the message converter for Pulsar client so that the conversion only happens at client side and the CPU overload of broker can be reduced.

API changes

First an interface is added under package org.apache.pulsar.common.api.raw

public interface MessageConverter {

    /**
     * Initialize some resources if necessary.
     *
     * @apiNote it should only be called once
     */
    default void init() throws IOException {
        // No ops by default
    }

    /**
     * Release the resources if they were initialized in init().
     */
    default void close() throws IOException {
        // No ops by default
    }

    /**
     * Determine whether the buffer can be converted
     *
     * @param buffer the buffer that might need to be converted
     * @return whether the buffer can be converted
     * @implNote it should treat `buffer` as a read-only buffer, i.e. the buffer's writer index won't change.
     *   However, the buffer's reader index might change in this method, so the caller side should call
     *   {@link ByteBuf#markReaderIndex()} to mark the original reader index if needed.
     */
    boolean accept(ByteBuf buffer);

    /**
     * Convert the buffer to the format that Pulsar consumer can recognize.
     *
     * @param originalBuffer the original buffer
     * @return the converted buffer
     * @implNote it can return either `originalBuffer` itself or a new allocated buffer. When it returns
     *   `originalBuffer` itself, the reference count must increase by 1. Therefore, we must call
     *   {@link ByteBuf#release()} to release the returned value in any case.
     *   The reader index and writer index might change in this method, so the caller side should call
     *   {@link ByteBuf#markReaderIndex()} or {@link ByteBuf#markWriterIndex()} if needed.
     */
    ByteBuf convert(ByteBuf originalBuffer);
}

The a new configuration is added

    @FieldContext(
            category = CATEGORY_PLUGIN,
            doc = "List of message converters, which are responsible to convert entries before dispatching. If multiple"
                    + " converters are accepted for the same payload, the previous one in this list is preferred."
    )
    private List<String> messageConverters;

Implementation

For MessageConverter, add a class MessageConverterValidator to validate whether the implementation is valid.

The implementation is simple. When the broker starts, load all classes that implement MessageConverter interface from messageConverters config. Then we can pass the converters to ServerCnx. Each time a dispatcher dispatches messages to consumer, it will eventually call ServerCnx#newMessageAndIntercept method, in which we can perform the conversion.

NOTE

This implementation needs to be improved because it requires the message payload has the MessageMetadata as its header. Otherwise, it could fail at AbstractBaseDispatcher#filterEntriesForConsumer before sending to consumer. But we can still get some way to pass messageConverters to a dispatcher.

For unit tests, we can test following converters:

  1. RejectAllConverter: accept returns false so that no conversion is performed.
  2. EchoConverter: accept returns true and convert simply returns the original buffer.
  3. BytesConverter: It's an example of a real world converter. The message format has the MessageMetadata part that has the entry.format=bytes property. And the payload part is only the raw bytes without SingleMessageMetadata. The BytesConverter#converter will convert the raw bytes to the format that Pulsar consumer can recognize.
@BewareMyPower BewareMyPower self-assigned this Sep 8, 2021
@BewareMyPower BewareMyPower added type/PIP type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages and removed type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages labels Sep 8, 2021
@eolivelli
Copy link
Contributor

do we need do add a Converter on the Publish side ?
IIUC this Converter is for the Consumer

@eolivelli
Copy link
Contributor

we should define some constraints on what the Converter can do with the input ByteBuf. Should it keep the reader/writer indexes ? what about refcount ?
can the Converter modify the ByteBuf in place or should it always create a copy ?

@eolivelli
Copy link
Contributor

I would add some "init(ServerConfiguration configuration)" method and a close() method, in order to let it be configurable and to let it release resources.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Sep 8, 2021 via email

@BewareMyPower
Copy link
Contributor Author

I would add some "init(ServerConfiguration configuration)" method and a close() method, in order to let it be configurable and to let it release resources.

Thanks for your suggestion. It makes sense to me. When we need to load converters from other systems, the load and release phase are necessary.

@wangjialing218
Copy link
Contributor

If I use kafka client to produce messages and pulsar client to consume messages, the performace of message publish could be improve but performace of message dispatch may be downgrade.
If the topic has multi subscriptions, one message could be dispatched to multi consumers, does the covert of this message happen multi times in this case?

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Sep 8, 2021

@wangjialing218 The conversion is unavoidable. We have to sacrifice either producer or consumer side. But before this proposal, if we're going to support both Kafka and Pulsar clients, the conversion was:

  1. Pulsar producer -> BK
  2. Kafka producer -> convert to Pulsar format (by KoP) -> BK
  3. BK -> convert to Kafka format (by KoP) -> Kafka consumer
  4. BK -> Pulsar consumer

After this proposal:

  1. Pulsar producer -> BK
  2. Kafka producer -> BK
  3. BK -> convert if necessary (by KoP) -> Kafka consumer
  4. BK -> convert if necessary (by MessageConverter) -> Pulsar consumer

For your second question

If the topic has multi subscriptions, one message could be dispatched to multi consumers, does the covert of this message happen multi times in this case?

Currently yes. But I'm not sure if the implementation can be improved. In my demo implementation I performed the conversion in ServerCnx#newMessageAndIntercept. But maybe it could be performed immediately after the entries were read.

@BewareMyPower
Copy link
Contributor Author

BTW, here is my demo implementation in https:/BewareMyPower/pulsar/tree/bewaremypower/payload-converter (will be deleted after pushing the final implementation). The unit tests were not pushed but I've added them in my local env. And the interface was also a little different.

@BewareMyPower
Copy link
Contributor Author

we should define some constraints on what the Converter can do with the input ByteBuf. Should it keep the reader/writer indexes ? what about refcount ?
can the Converter modify the ByteBuf in place or should it always create a copy ?

Yes. I'll add more description.

@wangjialing218
Copy link
Contributor

@BewareMyPower Just a idea. Current there is one ManagedLedger(ledger) associated with PersistentTopic. Could we add another ManagedLedger(kafkaLedger) associated with the topic.

  1. Pulsar producer -> ledger
  2. Kafka producer -> kafkaLedger
  3. ledger -> Pulsar consumer
  4. kafkaLedger-> Kafka consumer
  5. ledger -> convert from pulsar format to kafka -> kopLedger
  6. kopLedger -> convert from kafka format to pulsar -> ledger
    5 and 6 are running in background task, so the conversion is done only once asynchronously. It will cost more disk space but improve performace of message throughput.

@BewareMyPower
Copy link
Contributor Author

@wangjialing218 It could be an enhancement. I think we can submit another PIP for the design details. It sounds like a solution that each PersistentTopic has multiple ManagedLedgers and each ManagedLedger has its own message converter. But I think we can still use only one ManagedLedger because MessageConverter#accept can distinguish whether the buffer needs to be converted.

For example, in KoP, there's still a MessageMetadata with an entry.format=kafka property added before the original Kafka records. The overhead is not significant IMO. We can easily differ a message from Kafka client or Pulsar client.

@eolivelli BTW, I just found a problem from @wangjialing218's idea. If we passed ServiceConfiguration to MessageConverter#init, we should move it from pulsar-common module to pulsar-broker-common module. If we did that, managed-ledger module cannot use the converter. There's another problem that we must define a class that derived from ServiceConfiguration like what protocol handlers did. If we don't need the access to the existing configurations in ServiceConfiguration, the param will be unnecessary.

Here is an example if the converter needs to initialize some resources.

KeyMessageConverter converter = new KeyMessageConverter(remoteServerAddress);
converter.init(); // request key from remote server once or periodically
// call accept() and convert() using the dynamically changed key...
converter.close(); // close the connection to key server

@BewareMyPower
Copy link
Contributor Author

I've updated the MessageConverter API and add the validator to the implementation. PTAL again @eolivelli

@codelipenghui
Copy link
Contributor

@BewareMyPower

Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary.

We should avoid converting(serialization and deserialization) the data at the broker side, this will put a very heavy burden on the broker GC. In my opinion, we should do the data converting at the client-side, we can have a diverse data format implementation and by default, the Pulsar client only has the Pulsar data format processor. if users want to consume the data with Kafka data format, they can add a separate dependency such as org.apache.pulsar:kafka-format-converter.

The data to the Kafka client, should not be considered by the Pulsar broker, the KoP should handle it. If the data with Kafka format, KoP can send it directly to the Kafka consumer, if the data with Pulsar format, KoP needs to convert the data to the Kafka format.

For the storage layer(BookKeeper and tiered storage), the data might be read directly bypass the broker such as PulsarSQL, Flink(In the future). This is also be considered if we are doing the data conversion at the broker side, we might need another implementation to read the data with multiple data formats from the BookKeeper/Tiered Storage.

@wangjialing218

Just a idea. Current there is one ManagedLedger(ledger) associated with PersistentTopic. Could we add another ManagedLedger(kafkaLedger) associated with the topic.

We can't use multiple managed ledgers for a topic, this will break the FIFO semantics if you have Kafka producers and Pulsar producers publishing data to the topic and Kafka consumers, Pulsar consumers to consume the data from the topic.
And to read data repeatedly, we need to ensure the same reading order.

@eolivelli

do we need do add a Converter on the Publish side ?

If the KoP wants to convert the data at the publishing path, KoP(Or other protocol handlers) can implement directly, any reason introduces the converter at the broker for the data publishing? And I think if using kafka format for KoP, KoP will convert the data on the publish side for now, this is an inefficient way.

@eolivelli
Copy link
Contributor

@codelipenghui

if users want to
consume the data with Kafka data format, they can add a separate dependency such as org.apache.pulsar:kafka-format-converter.

this is not always possible because downstream users may not be aware of the format of the data.
it should be something like Compression if you want to do it in the client, you must put a metadata in the message and let the client apply the converter depending on the format.

btw this will be very hard in big enterprises, to require to add a client side plugin to every possible client

@codelipenghui
Copy link
Contributor

@eolivelli Both KoP or the client-side converter are optional plugins, not required. If users tend to convert the data at the broker side, the existing KoP implementation already supports this requirement. They can just convert the data at the publish side, no need to convert it multiple times on the consumption side.

I think we need to clarify the purpose of this proposal, we are trying to find a way to support efficient data conversion between different formats such as Kafka, Pulsar. This initial motivation is we are seen the performance issue on KoP before because we have done data conversion at the broker side, so introduce data format(Kafka or Pulsar) in KoP to avoid the data conversion.

This will not prevent users to use the broker-side data conversion, just to provide a more efficient way to handle the data conversion. Just one more choice for users, but if we did the data conversion at the broker-side, users can only choose one solution(Essentially, no difference between publishing converter and consumption converter, and the consumption converter is more expensive than the publishing side that @wangjialing218 has mentioned here #11962 (comment))

@BewareMyPower
Copy link
Contributor Author

@codelipenghui The GC problem for KoP is caused by the heap memory usage, not the conversion, the converter only affects CPU theoretically because all memory usages of messages are from direct memory.

I think we need to clarify the purpose of this proposal, we are trying to find a way to support efficient data conversion between different formats such as Kafka, Pulsar.

It's not. Introducing data format in KoP is to avoid the data conversion, right. Because we tends to push users to use pure Kafka client when using KoP. However, if they want to turn to Pulsar client later, it could be impossible because there's no way for Pulsar consumer to consume these messages produced by Kafka producer.

Actually, it's right the conversion should be performed at client side. But upgrading the client might not be always easy. @eolivelli was right that ​this is not always possible because downstream users may not be aware of the format of the data.

If users tend to convert the data at the broker side, the existing KoP implementation already supports this requirement.

No. It requires configuring the entry.format=pulsar and then the interact among Kafka clients could have a bad performance.

In conclusion, supporting message converter at both broker level and client level is necessary. They have both PROs and CONs.

  • Broker support: All Pulsar clients are supported but the conversion overhead might affect the whole broker.
  • Client support: Only the latest Pulsar client (e.g. 2.9.0) is supported. Users must upgrade their client version. But the conversion overhead won't affect the whole broker.

It's a priority comparison about availability vs. performance. I chose the availability so I tried to implement the Message converter at broker side first. The message converter here can also be applied to Pulsar Client. I'm not sure if it's proper to contain two tasks in a PIP. If yes, I can add the client side support in this proposal.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Sep 8, 2021

@codelipenghui You can see streamnative/kop#673 for more details about the GC problem. In this PR, I used the direct memory allocator and the GC problem was fixed. The left heap memory usage is a KoP side problem, not related to data format conversion. Before this PR, deserializing Kafka records also doesn't use heap memory.

  1. Kafka records to Pulsar message
  • Read records (no memory allocated)
  • Create Pulsar message (memory allocated from direct memory)
  1. Pulsar message to Kafka records

@codelipenghui
Copy link
Contributor

@BewareMyPower

No. It requires configuring the entry.format=pulsar and then the interact among Kafka clients could have a bad performance.

Because we are converting the Kafka format data to Pulsar format data right? If yes, what is the difference when we converting Kafka format data to Pulsar format data at the consumption path?

@codelipenghui
Copy link
Contributor

If using entry.format=pulsar,

for the Kafka client publish: kafka format -> pulsar format, for Kafka client consume: pulsar format -> kafka format (improved by streamnative/kop#673)
for the Pulsar client consume pulsar format -> pulsar format

After introduced the message converter and entry.format=kafka:

for the Kafka client publish: kafka format -> kafka format, for Kafka client consume: kafka format -> kafka format
for the Pulsar client consume: kafka format -> pulsar format (by the converter)

Is my understanding above correct @BewareMyPower

I said the existing KoP implementation already supports this requirement means if we are using the entry.format=pulsar, the Kafka client can consume the data written by the Pulsar client and the Pulsar client can consume the data from the Kafka client by some conversion at the broker side. I did not understand why the broker side converter approach will have a great advantage over entry.format=pulsar

@BewareMyPower
Copy link
Contributor Author

@codelipenghui The fact is the most existing KoP users prefer entry.format=kafka. If we changed entry.format from kafka to pulsar, all existing topics/data would be not available because the entries are all Kafka format.

  • Pulsar consumers cannot recognize these entries without PIP 94.
  • KoP itself cannot recognize these entries because KoP assumes them as Pulsar format, so Kafka consumers cannot receive them.

Even without these concerns, you can see following table for the comparison.

Performance entry.format=kafka entry.format=pulsar
Kafka Producer & Kafka Consumer High Extremely low
Kafka Producer & Pulsar Consumer Not Available Low
Pulsar Producer & Pulsar Consumer High High
Pulsar Producer & Kafka Consumer Low Low

After PIP 94, the Not Available entry would become Low. The main difference is, for all Kafka producers, entry.format=pulsar will always perform the conversion even if the consumers are also Kafka consumers. entry.format=kafka performs conversions only if necessary for consumer side.

Entry's format Kafka Consumer Pulsar Consumer
Pulsar Conversion No conversion
Kafka No conversion Conversion

After PIP 94, the No conversion of first row would become a check for entry buffer.

@BewareMyPower
Copy link
Contributor Author

If the user only had Kafka clients for now, they can choose entry.format=kafka for high performance. If one day they switched to Pulsar clients, they can configure message converters to consume old data.

If the user already had Kafka clients and Pulsar clients for the same topics, they must choose entry.format=pulsar now without PIP 94. In future, when they eliminate all Kafka clients, even including KoP itself, they'll find there's no way to consume old data.

@wangjialing218
Copy link
Contributor

@wangjialing218

Just a idea. Current there is one ManagedLedger(ledger) associated with PersistentTopic. Could we add another ManagedLedger(kafkaLedger) associated with the topic.

We can't use multiple managed ledgers for a topic, this will break the FIFO semantics if you have Kafka producers and Pulsar producers publishing data to the topic and Kafka consumers, Pulsar consumers to consume the data from the topic.
And to read data repeatedly, we need to ensure the same reading order.

@codelipenghui We could consider how to keep FIFO semantics when using multiple managed ledgers. For example, only one primary ledger is writeable (store original messages from producer), other ledgers are readable (store conversion result for each message from primary ledger), and keep the message order same in all ledgers. consumer could select primary ledger or one readable ledger to consume messages.

I have also considerd convert message at broker level from other motivation (not only for protocol handler).
Currently, if we want to do some customized message conversion before send messages to consumer, we could use pulsar funtion to consume messages from the original topic, do the conversion and publish result to another topic. This is a inefficiency way that would cost much more network and storage.
And if the conversion is something like remove sensitive information from original message, which should not send to particular consumer, It's not suitable to do the conversion at cosumer side.

The purpose of multiple managed ledgers for a topic is to do message conversion asynchronously. This will cost more storage but no need to sacrifice performance neither producer nor consumer side.

@codelipenghui
Copy link
Contributor

If the user only had Kafka clients for now, they can choose entry.format=kafka for high performance. If one day they switched to Pulsar clients, they can configure message converters to consume old data.
If the user already had Kafka clients and Pulsar clients for the same topics, they must choose entry.format=pulsar now without PIP 94. In future, when they eliminate all Kafka clients, even including KoP itself, they'll find there's no way to consume old data.

@BewareMyPower
Yes, I know this case. Use entry.format=pulsar, the message publishing side burden the broker. With PIP-94 consume from Pulsar client burden the broker, In my opinion, it is difficult for users to choose, if you have only one subscription to read the data from the topic by the Pulsar client, you might choose broker converter, but if you have 100 subscriptions to read the data from the topic by Pulsar client, you might want to choose the entry.format=pulsar.

And If we are try to introduce the data format processor at the client-side again, users will have 3 options, entry.format for KoP, messageConverter for the broker, data format processor for the client-side. This will make it very complicated to use.

In my opinion,

  1. if users want to achieve high performance, they should avoid the broker-side data format conversion.
  2. If users do not care about the performance, they can use entry.format=pulsar, no need to add more dependencies at the client-side.

The proposal looks like an intermediate approach for users who want to get high performance. I agree PIP-94 can bring performance improvements to certain scenarios, but considering the complexity and the broker resource consumption that PIP-94 might bring to Pulsar, we need to think about it carefully.

@codelipenghui
Copy link
Contributor

@wangjialing218

We could consider how to keep FIFO semantics when using multiple managed ledgers. For example, only one primary ledger is writeable (store original messages from producer), other ledgers are readable (store conversion result for each message from primary ledger), and keep the message order same in all ledgers. consumer could select primary ledger or one readable ledger to consume messages.

@wangjialing218 This looks like need to copy data between different managed ledgers? and we have multiple copies with the primary managed ledger and the other managed ledgers.

I have also considerd convert message at broker level from other motivation (not only for protocol handler).
Currently, if we want to do some customized message conversion before send messages to consumer, we could use pulsar funtion to consume messages from the original topic, do the conversion and publish result to another topic. This is a inefficiency way that would cost much more network and storage.

I think it's not the same storage as the PIP-94 right? PIP 94 is converting the data encoding format, it will not touch the user's data, if you want to convert the user's data, the broker needs to deserialize data which will bring more GC workload on the broker. Yes do data conversion on the broker side can reduce the network workload, but increase the CPU workload, the burden of JVM GC, you might get a more unstable broker.

The purpose of multiple managed ledgers for a topic is to do message conversion asynchronously. This will cost more storage but no need to sacrifice performance neither producer nor consumer side.

But there are also many disadvantages, each managed ledger we need to maintain the metadata, more data copies, more entries write to bookies.

@BewareMyPower
Copy link
Contributor Author

@codelipenghui

In my opinion,

  1. if users want to achieve high performance, they should avoid the broker-side data format conversion.
  2. If users do not care about the performance, they can use entry.format=pulsar, no need to add more dependencies at the client-side.

It's right. But this proposal is mainly for the first case, not the second case. What if the user switched to Pulsar client later? Without the converter, they have to discard old messages. If it's not acceptable, they would never consider switching to Pulsar client.

but if you have 100 subscriptions to read the data from the topic by Pulsar client, you might want to choose the entry.format=pulsar.

It's right because in this case users prefer to store messages as Pulsar format. However, as I've said, the message converter is mainly for the entry.format=kafka case, i.e. users prefer to store messages as Kafka format. But in some cases, they also need Pulsar consumer to read these messages.

Both kafka and pulsar entry format would bring burdens on broker, but we must ensure that the interaction among Kafka clients and Pulsar clients work for both cases.

And If we are try to introduce the data format processor at the client-side again, users will have 3 options, entry.format for KoP, messageConverter for the broker, data format processor for the client-side. This will make it very complicated to use.

  1. entry.format=pulsar without any care for performance.
  2. entry.format=kafka for better performance among Kafka clients.
    If you still want to consume these messages somehow with Pulsar client, configure the message converter
    1. at client level if you can upgrade your Pulsar client >= 2.9.0
    2. at broker level if you have some older Pulsar clients and cannot upgrade for some reasons.

We should cover all use cases.

@wangjialing218
Copy link
Contributor

@codelipenghui

I think it's not the same storage as the PIP-94 right? PIP 94 is converting the data encoding format, it will not touch the user's data, if you want to convert the user's data, the broker needs to deserialize data which will bring more GC workload on the broker. Yes do data conversion on the broker side can reduce the network workload, but increase the CPU workload, the burden of JVM GC, you might get a more unstable broker.

Yes, I means convert user's data in broker side. Thanks for your advise, I'll consider the disadvantages and see if there is a solution.

But there are also many disadvantages, each managed ledger we need to maintain the metadata, more data copies, more entries write to bookies.

Could we record the primary ledger id in other managed ledgers' metadata, write only one data copies to bookies? If the data is lost, we read data from primary ledger (which have multiple copies) with same entry id and do conversion again.

@codelipenghui
Copy link
Contributor

codelipenghui commented Sep 9, 2021

@BewareMyPower Users should avoid using entry.format=kafka if they consider consuming data from the Pulsar clients in the future until we provide a way to consume the Kafka format data in the Pulsar client. And the pulsar format is the default option for KoP https:/streamnative/kop/blob/330c3bf084104a548eb28f310860ac519c45d999/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java#L336.

In any case, if the user wants to get better performance, they need to do some changes, or broker-side, or client-side. Why not consider a more efficient way? In other words, if they change the format to kafka, users are looking forward to the high performance, but after we increase the burden of the broker, will this meet the user's performance requirements?

If users set up incorrect format before, they should do the topic migration before we provided the solution.

@BewareMyPower
Copy link
Contributor Author

@codelipenghui The default entry.format is pulsar is just because there's no way to interact among Pulsar clients and Kafka clients when entry.format is kafka before. streamnative/kop#632 fixed a part of this problem so that Kafka consumer can consume messages from Pulsar producer. But if we want Pulsar consumer to consume messages from Kafka producer, we need to fix the broker side. If we have the PIP 94, things became easy. entry.format=pulsar would be deprecated. Regarding to the case there're multiple subscriptions on the same topic, it's just a bad vs more bad compare.

after we increase the burden of the broker, will this meet the user's performance requirements?

It's still a priority comparison I've said before: availability vs. performance. We should tell user not to use older Pulsar clients. I accept both cases, just need to change PIP 94 to Message converter at client level, we still have the converter but it should only be added to client's config.

If users set up incorrect format before, they should do the topic migration before we provided the solution.

It's nearly impossible. The entry.format config is broker level. For example, if you have N messages with entry.format=pulsar, you can read them all with Pulsar consumer and persist them somewhere. Then change the entry.format to kafka, restart the broker, and retrieve messages from somewhere and publish them to new topics. Changing entry.format from kafka to pulsar is the same.

@BewareMyPower
Copy link
Contributor Author

I accept both cases, just need to change PIP 94 to Message converter at client level, we still have the converter but it should only be added to client's config.

I just thought of a problem. If we added converter on client side, only Java client (>= 2.9.0) was able to consume these messages. There is still no way for other client to consume them. We need to write converters for other language clients, it could take much effort because we cannot reuse the Java classes.

@BewareMyPower
Copy link
Contributor Author

After discussing with @codelipenghui and @hangc0276 , I think it's better to add converter on client side. The main reason is we cannot find a good implementation to insert the converter.

Currently, we must convert entries in callback of asyncReadEntries, like

public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
havePendingRead = false;
} else {

After inserting the converter, it could be like

    public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
        entries = converterList.convert(entries); // wrapper of converter's methods
        ReadType readType = (ReadType) ctx;

readEntriesComplete runs in a single thread. Even if convert took 1 millisecond each time (actual for Kafka entry converter, it could take 5 or more milliseconds), the thread could be blocked and all other topics could be affected. It would be dangerous for broker to configure the converter.

I'll open a new PIP to add message converter for Java client. For other client, maybe it needs extra efforts to do it.

/cc @eolivelli

@eolivelli
Copy link
Contributor

I'll open a new PIP to add message converter for Java client. For other client, maybe it needs extra efforts to do it.

Make sense to me.
You probably can push this PIP to the Wiki and tag it as "Rejected"
this way it will stay as docs for the future

cc @merlimat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants