Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General message engineering in Producer #559

Merged
merged 11 commits into from
Jul 25, 2023
306 changes: 279 additions & 27 deletions python/rocketmq/client.py

Large diffs are not rendered by default.

30 changes: 27 additions & 3 deletions python/rocketmq/client_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,49 @@


class ClientConfig:
"""Client configuration class which holds the settings for a client.
The settings include endpoint configurations, session credential provider and SSL settings.
An instance of this class is used to setup the client with necessary configurations.
"""

def __init__(
self,
endpoints: Endpoints,
session_credentials_provider: SessionCredentialsProvider,
ssl_enabled: bool,
):
#: The endpoints for the client to connect to.
self.__endpoints = endpoints

#: The session credentials provider to authenticate the client.
self.__session_credentials_provider = session_credentials_provider

#: A flag indicating if SSL is enabled for the client.
self.__ssl_enabled = ssl_enabled

#: The request timeout for the client in seconds.
self.request_timeout = 10

@property
def session_credentials_provider(self):
def session_credentials_provider(self) -> SessionCredentialsProvider:
"""The session credentials provider for the client.

:return: the session credentials provider
"""
return self.__session_credentials_provider

@property
def endpoints(self):
def endpoints(self) -> Endpoints:
"""The endpoints for the client to connect to.

:return: the endpoints
"""
return self.__endpoints

@property
def ssl_enabled(self):
def ssl_enabled(self) -> bool:
"""A flag indicating if SSL is enabled for the client.

:return: True if SSL is enabled, False otherwise
"""
return self.__ssl_enabled
19 changes: 18 additions & 1 deletion python/rocketmq/client_id_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,36 @@


class ClientIdEncoder:
"""This class generates a unique client ID for each client based on
hostname, process id, index and the monotonic clock time.
"""

#: The current index for client id generation.
__INDEX = 0

#: The lock used for thread-safe incrementing of the index.
__INDEX_LOCK = threading.Lock()

#: The separator used in the client id string.
__CLIENT_ID_SEPARATOR = "@"

@staticmethod
def __get_and_increment_sequence():
def __get_and_increment_sequence() -> int:
"""Increment and return the current index in a thread-safe manner.

:return: the current index after incrementing it.
"""
with ClientIdEncoder.__INDEX_LOCK:
temp = ClientIdEncoder.__INDEX
ClientIdEncoder.__INDEX += 1
return temp

@staticmethod
def generate() -> str:
"""Generate a unique client ID.

:return: the generated client id
"""
index = ClientIdEncoder.__get_and_increment_sequence()
return (
socket.gethostname()
Expand Down
104 changes: 104 additions & 0 deletions python/rocketmq/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import List

from protocol.definition_pb2 import Broker as ProtoBroker
from protocol.definition_pb2 import Encoding as ProtoEncoding
from protocol.definition_pb2 import MessageQueue as ProtoMessageQueue
from protocol.definition_pb2 import MessageType as ProtoMessageType
from protocol.definition_pb2 import Permission as ProtoPermission
Expand All @@ -25,20 +26,55 @@
from rocketmq.rpc_client import Endpoints


class Encoding(Enum):
"""Enumeration of supported encoding types."""
IDENTITY = 0
GZIP = 1


class EncodingHelper:
"""Helper class for converting encoding types to protobuf."""

@staticmethod
def to_protobuf(mq_encoding):
"""Convert encoding type to protobuf.

:param mq_encoding: The encoding to be converted.
:return: The corresponding protobuf encoding.
"""
if mq_encoding == Encoding.IDENTITY:
return ProtoEncoding.IDENTITY
elif mq_encoding == Encoding.GZIP:
return ProtoEncoding.GZIP


class Broker:
"""Represent a broker entity."""

def __init__(self, broker):
self.name = broker.name
self.id = broker.id
self.endpoints = Endpoints(broker.endpoints)

def to_protobuf(self):
"""Convert the broker to its protobuf representation.

:return: The protobuf representation of the broker.
"""
return ProtoBroker(
Name=self.name, Id=self.id, Endpoints=self.endpoints.to_protobuf()
)


class Resource:
"""Represent a resource entity."""

def __init__(self, name=None, resource=None):
"""Initialize a resource.

:param name: The name of the resource.
:param resource: The resource object.
"""
if resource is not None:
self.namespace = resource.ResourceNamespace
self.name = resource.Name
Expand All @@ -47,22 +83,34 @@ def __init__(self, name=None, resource=None):
self.name = name

def to_protobuf(self):
"""Convert the resource to its protobuf representation.

:return: The protobuf representation of the resource.
"""
return ProtoResource(ResourceNamespace=self.namespace, Name=self.name)

def __str__(self):
return f"{self.namespace}.{self.name}" if self.namespace else self.name


class Permission(Enum):
"""Enumeration of supported permission types."""
NONE = 0
READ = 1
WRITE = 2
READ_WRITE = 3


class PermissionHelper:
"""Helper class for converting permission types to protobuf and vice versa."""

@staticmethod
def from_protobuf(permission):
"""Convert protobuf permission to Permission enum.

:param permission: The protobuf permission to be converted.
:return: The corresponding Permission enum.
"""
if permission == ProtoPermission.READ:
return Permission.READ
elif permission == ProtoPermission.WRITE:
Expand All @@ -76,6 +124,11 @@ def from_protobuf(permission):

@staticmethod
def to_protobuf(permission):
"""Convert Permission enum to protobuf permission.

:param permission: The Permission enum to be converted.
:return: The corresponding protobuf permission.
"""
if permission == Permission.READ:
return ProtoPermission.READ
elif permission == Permission.WRITE:
Expand All @@ -87,29 +140,47 @@ def to_protobuf(permission):

@staticmethod
def is_writable(permission):
"""Check if the permission is writable.

:param permission: The Permission enum to be checked.
:return: True if the permission is writable, False otherwise.
"""
if permission in [Permission.WRITE, Permission.READ_WRITE]:
return True
else:
return False

@staticmethod
def is_readable(permission):
"""Check if the permission is readable.

:param permission: The Permission enum to be checked.
:return: True if the permission is readable, False otherwise.
"""
if permission in [Permission.READ, Permission.READ_WRITE]:
return True
else:
return False


class MessageType(Enum):
"""Enumeration of supported message types."""
NORMAL = 0
FIFO = 1
DELAY = 2
TRANSACTION = 3


class MessageTypeHelper:
"""Helper class for converting message types to protobuf and vice versa."""

@staticmethod
def from_protobuf(message_type):
"""Convert protobuf message type to MessageType enum.

:param message_type: The protobuf message type to be converted.
:return: The corresponding MessageType enum.
"""
if message_type == ProtoMessageType.NORMAL:
return MessageType.NORMAL
elif message_type == ProtoMessageType.FIFO:
Expand All @@ -123,6 +194,11 @@ def from_protobuf(message_type):

@staticmethod
def to_protobuf(message_type):
"""Convert MessageType enum to protobuf message type.

:param message_type: The MessageType enum to be converted.
:return: The corresponding protobuf message type.
"""
if message_type == MessageType.NORMAL:
return ProtoMessageType.NORMAL
elif message_type == MessageType.FIFO:
Expand All @@ -136,7 +212,13 @@ def to_protobuf(message_type):


class MessageQueue:
"""A class that encapsulates a message queue entity."""

def __init__(self, message_queue):
"""Initialize a MessageQueue instance.

:param message_queue: The initial message queue to be encapsulated.
"""
self._topic_resource = Resource(message_queue.topic)
self.queue_id = message_queue.id
self.permission = PermissionHelper.from_protobuf(message_queue.permission)
Expand All @@ -148,12 +230,24 @@ def __init__(self, message_queue):

@property
def topic(self):
"""The topic resource name.

:return: The name of the topic resource.
"""
return self._topic_resource.name

def __str__(self):
"""Get a string representation of the MessageQueue instance.

:return: A string that represents the MessageQueue instance.
"""
return f"{self.broker.name}.{self._topic_resource}.{self.queue_id}"

def to_protobuf(self):
"""Convert the MessageQueue instance to protobuf message queue.

:return: A protobuf message queue that represents the MessageQueue instance.
"""
message_types = [
MessageTypeHelper.to_protobuf(mt) for mt in self.accept_message_types
]
Expand All @@ -167,12 +261,22 @@ def to_protobuf(self):


class TopicRouteData:
"""A class that encapsulates a list of message queues."""

def __init__(self, message_queues: List[definition_pb2.MessageQueue]):
"""Initialize a TopicRouteData instance.

:param message_queues: The initial list of message queues to be encapsulated.
"""
message_queue_list = []
for mq in message_queues:
message_queue_list.append(MessageQueue(mq))
self.__message_queue_list = message_queue_list

@property
def message_queues(self) -> List[MessageQueue]:
"""The list of MessageQueue instances.

:return: The list of MessageQueue instances that the TopicRouteData instance encapsulates.
"""
return self.__message_queue_list
Loading
Loading