Skip to content

Commit

Permalink
Initial commit for Python client (#508)
Browse files Browse the repository at this point in the history
* add protocol layer code & proto file

* add rpcClient.py

* add .gitignore & add license & fix timeout

* Add python git ignore files

* Delete redendant IDL files from python directory

* Add license header for python files

* Add comments for unfinished works

---------

Co-authored-by: Aaron Ai <[email protected]>
  • Loading branch information
yanchaomei and aaron-ai committed May 11, 2023
1 parent c894d41 commit ec5c0ec
Show file tree
Hide file tree
Showing 9 changed files with 1,012 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ vendor/
golang/*.tests
golang/*.test
golang/*.exe

# Python
*.pyc
*.pyo
*.pyd
python/__pycache__/
125 changes: 125 additions & 0 deletions python/client/rpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from grpc import ssl_channel_credentials, insecure_channel
from datetime import timedelta
import time
from grpc_interceptor import ClientInterceptor, ClientCallDetails
import protocol.service_pb2 as pb2
import protocol.service_pb2_grpc as servicegrpc

class MetadataInterceptor(ClientInterceptor):
def __init__(self, metadata):
self.metadata = metadata

def intercept(self, request, metadata, client_call_details, next):
metadata.update(self.metadata)
new_client_call_details = ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return next(request, new_client_call_details)


class RpcClient:
CONNECT_TIMEOUT_MILLIS = 3*1000
GRPC_MAX_MESSAGE_SIZE = 2*31 - 1
def __init__(self, endpoints, sslEnabled):
channel_options = [
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
('grpc.keepalive_time_ms', 1000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
]
if sslEnabled:
ssl_creds = ssl_channel_credentials()
self.channel = Channel(endpoints.getGrpcTarget(), ssl_creds, options=channel_options)
else:
self.channel = insecure_channel(endpoints.getGrpcTarget(), options=channel_options)

self.activityNanoTime = time.monotonic_ns()

def get_stub(self, metadata):
interceptor = MetadataInterceptor(metadata)

interceptor_channel = grpc.intercept_channel(self.channel, interceptor)
stub = servicegrpc.MessagingServiceStub(interceptor_channel)
return stub

def __del__(self):
self.channel.close()

def idle_duration(activity_nano_time):
return timedelta(microseconds=(time.monotonic_ns() - activity_nano_time) / 1000)

async def query_route(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.QueryRoute(request, timeout=duration)

async def heartbeat(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.Heartbeat(request, timeout=duration)

async def send_message(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.SendMessage(request, timeout=duration)

async def query_assignment(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.QueryAssignment(request, timeout=duration)

# TODO: Not yet imeplemented
async def receive_message(self, metadata, request, duration):
pass

async def ack_message(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.AckMessage(request, timeout=duration)

async def change_invisible_duration(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.ChangeInvisibleDuration(request, timeout=duration)

async def forward_message_to_dead_letter_queue(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.ForwardMessageToDeadLetterQueue(request, timeout=duration)

async def endTransaction(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.EndTransaction(request, timeout=duration)


async def notifyClientTermination(self, metadata, request, duration):
self.activity_nano_time = time.monotonic_ns()
stub = self.get_stub(self, metadata)
return await stub.NotifyClientTermination(request, timeout=duration)

async def telemetry(self, metadata, duration, response_observer):
stub = self.get_stub(self, metadata)
return await stub.Telemetry(response_observer, timeout=duration)
14 changes: 14 additions & 0 deletions python/protocol/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
47 changes: 47 additions & 0 deletions python/protocol/admin_pb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: admin.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61\x64min.proto\x12\x12\x61pache.rocketmq.v2\"\x95\x01\n\x15\x43hangeLogLevelRequest\x12>\n\x05level\x18\x01 \x01(\x0e\x32/.apache.rocketmq.v2.ChangeLogLevelRequest.Level\"<\n\x05Level\x12\t\n\x05TRACE\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x08\n\x04INFO\x10\x02\x12\x08\n\x04WARN\x10\x03\x12\t\n\x05\x45RROR\x10\x04\"(\n\x16\x43hangeLogLevelResponse\x12\x0e\n\x06remark\x18\x01 \x01(\t2r\n\x05\x41\x64min\x12i\n\x0e\x43hangeLogLevel\x12).apache.rocketmq.v2.ChangeLogLevelRequest\x1a*.apache.rocketmq.v2.ChangeLogLevelResponse\"\x00\x42=\n\x12\x61pache.rocketmq.v2B\x07MQAdminP\x01\xa0\x01\x01\xd8\x01\x01\xf8\x01\x01\xaa\x02\x12\x41pache.Rocketmq.V2b\x06proto3')

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'admin_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:

DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\022apache.rocketmq.v2B\007MQAdminP\001\240\001\001\330\001\001\370\001\001\252\002\022Apache.Rocketmq.V2'
_CHANGELOGLEVELREQUEST._serialized_start=36
_CHANGELOGLEVELREQUEST._serialized_end=185
_CHANGELOGLEVELREQUEST_LEVEL._serialized_start=125
_CHANGELOGLEVELREQUEST_LEVEL._serialized_end=185
_CHANGELOGLEVELRESPONSE._serialized_start=187
_CHANGELOGLEVELRESPONSE._serialized_end=227
_ADMIN._serialized_start=229
_ADMIN._serialized_end=343
# @@protoc_insertion_point(module_scope)
81 changes: 81 additions & 0 deletions python/protocol/admin_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import admin_pb2 as admin__pb2


class AdminStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.ChangeLogLevel = channel.unary_unary(
'/apache.rocketmq.v2.Admin/ChangeLogLevel',
request_serializer=admin__pb2.ChangeLogLevelRequest.SerializeToString,
response_deserializer=admin__pb2.ChangeLogLevelResponse.FromString,
)


class AdminServicer(object):
"""Missing associated documentation comment in .proto file."""

def ChangeLogLevel(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_AdminServicer_to_server(servicer, server):
rpc_method_handlers = {
'ChangeLogLevel': grpc.unary_unary_rpc_method_handler(
servicer.ChangeLogLevel,
request_deserializer=admin__pb2.ChangeLogLevelRequest.FromString,
response_serializer=admin__pb2.ChangeLogLevelResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'apache.rocketmq.v2.Admin', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class Admin(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def ChangeLogLevel(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/apache.rocketmq.v2.Admin/ChangeLogLevel',
admin__pb2.ChangeLogLevelRequest.SerializeToString,
admin__pb2.ChangeLogLevelResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Loading

0 comments on commit ec5c0ec

Please sign in to comment.