1 # Copyright 2020 The gRPC Authors
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from concurrent.futures import ThreadPoolExecutor
18 from typing import Iterable
22 from google.protobuf.json_format import MessageToJson
28 def create_state_response(
29 call_state: phone_pb2.CallState.State) -> phone_pb2.StreamCallResponse:
30 response = phone_pb2.StreamCallResponse()
31 response.call_state.state = call_state
35 class Phone(phone_pb2_grpc.PhoneServicer):
39 self._lock = threading.RLock()
41 def _create_call_session(self) -> phone_pb2.CallInfo:
42 call_info = phone_pb2.CallInfo()
44 call_info.session_id = str(self._id_counter)
46 call_info.media = "https://link.to.audio.resources"
47 logging.info("Created a call session [%s]", MessageToJson(call_info))
50 def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None:
51 logging.info("Call session cleaned [%s]", MessageToJson(call_info))
54 self, request_iterator: Iterable[phone_pb2.StreamCallRequest],
55 context: grpc.ServicerContext
56 ) -> Iterable[phone_pb2.StreamCallResponse]:
58 request = next(request_iterator)
59 logging.info("Received a phone call request for number [%s]",
62 raise RuntimeError("Failed to receive call request")
63 # Simulate the acceptance of call request
65 yield create_state_response(phone_pb2.CallState.NEW)
66 # Simulate the start of the call session
68 call_info = self._create_call_session()
69 context.add_callback(lambda: self._clean_call_session(call_info))
70 response = phone_pb2.StreamCallResponse()
71 response.call_info.session_id = call_info.session_id
72 response.call_info.media = call_info.media
74 yield create_state_response(phone_pb2.CallState.ACTIVE)
75 # Simulate the end of the call
77 yield create_state_response(phone_pb2.CallState.ENDED)
78 logging.info("Call finished [%s]", request.phone_number)
81 def serve(address: str) -> None:
82 server = grpc.server(ThreadPoolExecutor())
83 phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server)
84 server.add_insecure_port(address)
86 logging.info("Server serving at %s", address)
87 server.wait_for_termination()
90 if __name__ == "__main__":
91 logging.basicConfig(level=logging.INFO)