Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General message engineering in Producer #559

Merged
merged 11 commits into from
Jul 25, 2023
64 changes: 61 additions & 3 deletions python/rocketmq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,46 @@
# limitations under the License.

import threading
import asyncio
from typing import Set

from protocol import service_pb2
from protocol.service_pb2 import QueryRouteRequest
from protocol import service_pb2, definition_pb2
from protocol.service_pb2 import QueryRouteRequest, HeartbeatRequest
from rocketmq.client_config import ClientConfig
from rocketmq.log import logger
from rocketmq.client_id_encoder import ClientIdEncoder
from rocketmq.definition import TopicRouteData
from rocketmq.definition import TopicRouteData, Resource
from rocketmq.rpc_client import Endpoints, RpcClient
from rocketmq.session import Session
from rocketmq.signature import Signature


class ScheduleWithFixedDelay:
def __init__(self, action, delay, period):
self.action = action
self.delay = delay
self.period = period
self.task = None

async def start(self):
# print("tes")
# await asyncio.sleep(self.delay)
while True:
try:
await self.action()
except Exception as e:
logger.info(f"Failed to execute scheduled task, Exception: {str(e)}")
finally:
await asyncio.sleep(self.period)

def schedule(self):
self.task = asyncio.create_task(self.start())

def cancel(self):
if self.task:
self.task.cancel()


class Client:
def __init__(self, client_config: ClientConfig, topics: Set[str]):
self.client_config = client_config
Expand All @@ -39,10 +67,34 @@ def __init__(self, client_config: ClientConfig, topics: Set[str]):
self.sessionsLock = threading.Lock()
self.client_manager = ClientManager(self)

self.isolated = dict()

async def start(self):
# get topic route
for topic in self.topics:
self.topic_route_cache[topic] = await self.fetch_topic_route(topic)
scheduler = ScheduleWithFixedDelay(self.heartbeat, 3, 12)
scheduler.schedule()

async def heartbeat(self):
try:
endpoints = self.GetTotalRouteEndpoints()
request = HeartbeatRequest()
request.client_type = definition_pb2.PRODUCER
topic = Resource()
topic.name = "normal_topic"
invocations = {}
print(len(endpoints))
# Collect task into a map.
for item in endpoints:
# print(item.grpc_target(True))
task = await self.client_manager.heartbeat(item, request, self.client_config.request_timeout)
invocations[item] = task
print(task)
print("finish")
break
except Exception as e:
logger.error(f"[Bug] unexpected exception raised during heartbeat, clientId={self.client_id}, Exception: {str(e)}")

def GetTotalRouteEndpoints(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use snake case naming rather than camel naming.

endpoints = set()
Expand All @@ -51,6 +103,12 @@ def GetTotalRouteEndpoints(self):
endpoints.add(endpoint)
return endpoints

async def get_route_data(self, topic):
if topic in self.topic_route_cache:
return self.topic_route_cache[topic]
topic_route_data = await self.fetch_topic_route(topic=topic)
return topic_route_data

def get_client_config(self):
return self.client_config

Expand Down
12 changes: 12 additions & 0 deletions python/rocketmq/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,21 @@
from protocol.definition_pb2 import MessageType as ProtoMessageType
from protocol.definition_pb2 import Permission as ProtoPermission
from protocol.definition_pb2 import Resource as ProtoResource
from protocol.definition_pb2 import Encoding as ProtoEncoding
from rocketmq.protocol import definition_pb2
from rocketmq.rpc_client import Endpoints

class Encoding(Enum):
IDENTITY=0
GZIP=1

class EncodingHelper:
@staticmethod
def to_protobuf(mq_encoding):
if mq_encoding == Encoding.IDENTITY:
return ProtoEncoding.IDENTITY
elif mq_encoding == Encoding.GZIP:
return ProtoEncoding.GZIP

class Broker:
def __init__(self, broker):
Expand Down
47 changes: 47 additions & 0 deletions python/rocketmq/exponential_backoff_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import timedelta
import math
from google.protobuf.duration_pb2 import Duration
from protocol.definition_pb2 import ExponentialBackoff as ProtoExponentialBackoff


class ExponentialBackoffRetryPolicy:
def __init__(self, max_attempts, initial_backoff, max_backoff, backoff_multiplier):
self._max_attempts = max_attempts
self.initial_backoff = initial_backoff
self.max_backoff = max_backoff
self.backoff_multiplier = backoff_multiplier

def get_max_attempts(self):
return self._max_attempts

def inherit_backoff(self, retry_policy):
if retry_policy.strategy_case != "ExponentialBackoff":
raise ValueError("Strategy must be exponential backoff")
return self._inherit_backoff(retry_policy.exponential_backoff)

def _inherit_backoff(self, retry_policy):
return ExponentialBackoffRetryPolicy(self._max_attempts,
retry_policy.initial.ToTimedelta(),
retry_policy.max.ToTimedelta(),
retry_policy.multiplier)

def get_next_attempt_delay(self, attempt):
delay_seconds = min(
self.initial_backoff.total_seconds() * math.pow(self.backoff_multiplier, 1.0 * (attempt - 1)),
self.max_backoff.total_seconds())
return timedelta(seconds=delay_seconds) if delay_seconds >= 0 else timedelta(seconds=0)

@staticmethod
def immediately_retry_policy(max_attempts):
return ExponentialBackoffRetryPolicy(max_attempts, timedelta(seconds=0), timedelta(seconds=0), 1)

def to_protobuf(self):
exponential_backoff = {
'Multiplier': self.backoff_multiplier,
'Max': Duration.FromTimedelta(self.max_backoff),
'Initial': Duration.FromTimedelta(self.initial_backoff)
}
return {
'MaxAttempts': self._max_attempts,
'ExponentialBackoff': exponential_backoff
}
119 changes: 105 additions & 14 deletions python/rocketmq/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import asyncio
import threading
from typing import Set

import time
import rocketmq
from rocketmq.client import Client
from rocketmq.client_config import ClientConfig
Expand All @@ -27,19 +27,21 @@
from rocketmq.protocol.definition_pb2 import Resource, SystemProperties
from rocketmq.protocol.service_pb2 import SendMessageRequest
from rocketmq.publish_settings import PublishingSettings
from rocketmq.exponential_backoff_retry_policy import ExponentialBackoffRetryPolicy
from rocketmq.rpc_client import Endpoints
from rocketmq.session_credentials import (SessionCredentials,
SessionCredentialsProvider)


from rocketmq.definition import PermissionHelper
from rocketmq.publishing_message import PublishingMessage
from rocketmq.message import Message
class PublishingLoadBalancer:
def __init__(self, topic_route_data: TopicRouteData, index: int = 0):
self.__index = index
self.__index_lock = threading.Lock()
message_queues = []
for mq in topic_route_data.message_queues:
if (
not mq.permission.is_writable()
not PermissionHelper().is_writable(mq.permission)
or mq.broker.id is not rocketmq.utils.master_broker_id
):
continue
Expand Down Expand Up @@ -89,9 +91,12 @@ def take_message_queues(self, excluded: Set[Endpoints], count: int):
class Producer(Client):
def __init__(self, client_config: ClientConfig, topics: Set[str]):
super().__init__(client_config, topics)
retry_policy = ExponentialBackoffRetryPolicy.immediately_retry_policy(10)
self.publish_settings = PublishingSettings(
self.client_id, self.endpoints, None, 10, topics
self.client_id, self.endpoints, retry_policy, 10, topics
)
self.publish_routedata_cache = {}


async def __aenter__(self):
await self.start()
Expand All @@ -108,25 +113,111 @@ async def shutdown(self):
logger.info(f"Begin to shutdown the rocketmq producer, client_id={self.client_id}")
logger.info(f"Shutdown the rocketmq producer successfully, client_id={self.client_id}")

async def send_message(self, message):
@staticmethod
def wrap_send_message_request(message, message_queue):
req = SendMessageRequest()
req.messages.extend([message])
topic_data = self.topic_route_cache["normal_topic"]
endpoints = topic_data.message_queues[2].broker.endpoints
return await self.client_manager.send_message(endpoints, req, 10)
req.messages.extend([message.to_protobuf(message_queue.queue_id)])
return req

async def send_message(self, message):
# get load balancer
publish_load_balancer = await self.get_publish_load_balancer(message.topic)
publishing_message = PublishingMessage(message, self.publish_settings)
retry_policy = self.get_retry_policy()
maxAttempts = retry_policy.get_max_attempts()

exception = None

candidates = (
publish_load_balancer.take_message_queues(set(self.isolated.keys()), maxAttempts)
if publishing_message.message.message_group is None else
[publish_load_balancer.take_message_queue_by_message_group(publishing_message.message.message_group)]
)
print(candidates)
for attempt in range(1, maxAttempts + 1):
stopwatch_start = time.time()

candidateIndex = (attempt - 1) % len(candidates)
mq = candidates[candidateIndex]
# print(mq.accept_message_types[0] == publishing_message.message_type)
# print((mq.accept_message_types[0].value))
# print((publishing_message.message_type)) TODO check why it's wrong using not in
if self.publish_settings.is_validate_message_type() and publishing_message.message_type.value != mq.accept_message_types[0].value:
raise ValueError(
"Current message type does not match with the accept message types," +
f" topic={message.topic}, actualMessageType={publishing_message.message_type}" +
f" acceptMessageType={','}")

sendMessageRequest = self.wrap_send_message_request(publishing_message, mq)
topic_data = self.topic_route_cache["normal_topic"]
endpoints = topic_data.message_queues[2].broker.endpoints
try:
invocation = await self.client_manager.send_message(endpoints, sendMessageRequest, self.client_config.request_timeout)
# sendReceipts = SendReceipt.process_send_message_response(mq, invocation)
print(invocation)
# sendReceipt = sendReceipts[0]
if attempt > 1:
logger.info(
f"Re-send message successfully, topic={message.topic}," +
f" maxAttempts={maxAttempts}, endpoints=, clientId={self.client_id}")

# return sendReceipt
except Exception as e:
exception = e
self.isolated[endpoints] = True
if attempt >= maxAttempts:
logger.info(f"Failed to send message finally, run out of attempt times, " +
f"topic={message.topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
f"endpoints={endpoints}, messageId={message.message_id}, clientId={self.client_id}")
raise

if message.message_type == "Transaction":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enumeration should be used here.

logger.info(f"Failed to send transaction message, run out of attempt times, " +
f"topic={message.topic}, maxAttempt=1, attempt={attempt}, " +
f"endpoints={endpoints}, messageId={message.message_id}, clientId={self.client_id}")
raise

if not isinstance(exception, TooManyRequestsException):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is TooManyRequestsException?

logger.info(f"Failed to send message, topic={message.topic}, maxAttempts={maxAttempts}, " +
f"attempt={attempt}, endpoints={endpoints}, messageId={message.message_id}," +
f" clientId={self.client_id}")
continue

nextAttempt = 1 + attempt
delay = retry_policy.get_next_attempt_delay(nextAttempt)
await asyncio.sleep(delay.total_seconds())


def update_publish_load_balancer(self, topic, topic_route_data):
publishing_load_balancer = None
if topic in self.publish_routedata_cache:
publishing_load_balancer = self.publish_routedata_cache[topic]
else:
publishing_load_balancer = PublishingLoadBalancer(topic_route_data)
self.publish_routedata_cache[topic] = publishing_load_balancer
return publishing_load_balancer

async def get_publish_load_balancer(self, topic):
if topic in self.publish_routedata_cache:
return self.publish_routedata_cache[topic]
topic_route_data = await self.get_route_data(topic)
return self.update_publish_load_balancer(topic, topic_route_data)

def get_settings(self):
return self.publish_settings

def get_retry_policy(self):
return self.publish_settings.GetRetryPolicy()

async def test():
credentials = SessionCredentials("username", "password")
credentials = SessionCredentials("user", "password")
credentials_provider = SessionCredentialsProvider(credentials)
client_config = ClientConfig(
endpoints=Endpoints("rmq-cn-jaj390gga04.cn-hangzhou.rmq.aliyuncs.com:8080"),
session_credentials_provider=credentials_provider,
ssl_enabled=True,
)
print(datetime.datetime.utcnow())
topic = Resource()
topic.name = "normal_topic"
msg = ProtoMessage()
Expand All @@ -137,9 +228,9 @@ async def test():
msg.system_properties.CopyFrom(sysperf)
logger.info(f"{msg}")
producer = Producer(client_config, topics={"normal_topic"})
message = Message(topic.name, msg.body)
await producer.start()
result = await producer.send_message(msg)
print(result)
await producer.send_message(message)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion python/rocketmq/publish_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from rocketmq.protocol.definition_pb2 import Resource as ProtoResource
from rocketmq.protocol.definition_pb2 import Settings as ProtoSettings
from rocketmq.rpc_client import Endpoints
from rocketmq.exponential_backoff_retry_policy import ExponentialBackoffRetryPolicy
from rocketmq.settings import (ClientType, ClientTypeHelper, IRetryPolicy,
Settings)
from rocketmq.signature import Signature
Expand All @@ -44,7 +45,7 @@ def __init__(
self,
client_id: str,
endpoints: Endpoints,
retry_policy: IRetryPolicy,
retry_policy: ExponentialBackoffRetryPolicy,
request_timeout: int,
topics: Dict[str, bool],
):
Expand Down
Loading
Loading