Comms
- class mosaic.comms.comms.CommsManager(runtime=None, address=None, port=None, context=None, loop=None)[source]
Bases:
object
Objects of this type manage the connections and message passing between different runtimes.
- Parameters:
runtime (Runtime, optional) – Current runtime, defaults to global runtime.
address (str) – IP address of the connection.
port (int) – Port to use for the connection.
context (zmq.Context, optional) – ZMQ socket context, defaults to global context.
loop (EventLoop, optional) – Event loop to use, defaults to global event loop.
- property address
Connection address.
- async beat(sender_id)[source]
Received
beat
message, the remote endpoint is alive.- Parameters:
sender_id (str) – Remote UID.
- async call(sender_id, method, reply, **kwargs)[source]
Run method in the loop.
- Parameters:
sender_id (str) – UID of the remote endpoint.
method (callable) – Method to execute
reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.
kwargs (optional) – Keyword arguments for the method.
- async call_safe(sender_id, method, reply, **kwargs)[source]
Run method in the loop, and within an exception handler that will process exceptions and send them back to the sender.
- Parameters:
sender_id (str) – UID of the remote endpoint.
method (callable) – Method to execute
reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.
kwargs (optional) – Keyword arguments for the method.
- cmd(*args, **kwargs)[source]
Synchronously send command to remote runtime.
For arguments and return values check
Comms.cmd_async
.
- async cmd_async(*args, **kwargs)[source]
Send command with given arguments and keyword arguments.
- Parameters:
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Depending on whether a reply is expected or not.
- Return type:
Reply or None
- cmd_pubsub(*args, **kwargs)[source]
Synchronously send command to remote runtime.
For arguments and return values check
Comms.cmd_async
.
- async cmd_pubsub_async(*args, **kwargs)[source]
Send command with given arguments and keyword arguments.
- Parameters:
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Depending on whether a reply is expected or not.
- Return type:
Reply or None
- cmd_recv(*args, **kwargs)[source]
Synchronously send command to remote runtime and wait for reply.
For arguments and return values check
Comms.cmd_async
.
- async cmd_recv_async(*args, **kwargs)[source]
Send command with given arguments and keyword arguments, and then wait for the reply.
- Parameters:
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Result of the reply
- Return type:
object
- async connect(sender_id, uid, address, port, notify=False)[source]
Create and connect outbound connection for a remote runtime, with a given address and port.
- Parameters:
uid (str) – Remote UID.
address (str) – Remote address.
port (int) – Remote port.
notify (bool, optional) – Whether or not to notify others of a new connection, defaults to False.
- connect_pubsub(uid, address, port)[source]
Create and connect pub-sub connection for a remote runtime, with a given address and port.
- Parameters:
uid (str) – Remote UID.
address (str) – Remote address.
port (int) – Remote port.
- connect_send(uid, address, port)[source]
Create and connect outbound connection for a remote runtime, with a given address and port.
- Parameters:
uid (str) – Remote UID.
address (str) – Remote address.
port (int) – Remote port.
- async disconnect(sender_id, uid, notify=False)[source]
Disconnect a remote endpoint.
- Parameters:
sender_id (str) – Sender UID.
uid (str) – Remote UID to disconnect.
notify (bool, optional) – Whether or not to notify others of the disconnection, defaults to False.
- async hand(sender_id, address, port)[source]
Handle incoming handshake.
- Parameters:
sender_id (str) – Remote UID.
address (str) – Remote address.
port (int) – Remote port.
- async handshake(uid, address, port, pubsub_port=None)[source]
Start handshake with remote
uid
, located at a certainaddress
andport
.- Parameters:
uid (str) – Remote UID.
address (str) – Remote address.
port (int) – Remote port.
pubsub_port (int, optional) – Publishing port.
- async heart(sender_id)[source]
Received
heart
message, respond withbeat
.- Parameters:
sender_id (str) – Remote UID.
- listen()[source]
Start the listening loop.
- Returns:
Future associated with the running loop.
- Return type:
concurrent.futures.Future
- async listen_async(recv_async)[source]
Asynchronous listening loop.
The loop waits on messages from the incoming connection, then processes them and, if necessary, passes them to the runtime.
- property logger
Runtime logger.
- property port
Connection port.
- async process_msg(sender_id, msg)[source]
Process a received message to decide what to do with it.
- Parameters:
sender_id (str) – UID of the remote endpoint.
msg (Message) – Message object.
- property pubsub_port
Connection port.
- recv(**kwargs)[source]
Synchronously receive message from remote runtime.
For arguments and return values check
Comms.recv_async
.
- async recv_async()[source]
Wait for received message from the inbound socket.
- Returns:
str – Sender UID.
Message – Received message.
- recv_pubsub(**kwargs)[source]
Synchronously receive message from remote runtime.
For arguments and return values check
Comms.recv_async
.
- async recv_pubsub_async()[source]
Wait for received message from the inbound pub-sub socket.
- Returns:
str – Sender UID.
Message – Received message.
- register_reply_future(future)[source]
Register a Reply to be accessible later on.
- Parameters:
future (Reply)
- reply(sender_id, uid, result)[source]
Process reply from remote runtime.
- Parameters:
sender_id (str) – UID of the remote endpoint.
uid (str) – UID of the associated Reply.
result (object) – Result of the reply.
- send(*args, **kwargs)[source]
Synchronously send message to remote runtime.
For arguments and return values check
Comms.send_async
.
- async send_async(send_uid, *args, **kwargs)[source]
Send message to
sender_id
with given arguments and keyword arguments.- Parameters:
send_uid (str) – UID of the remote runtime.
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Depending on whether a reply is expected or not.
- Return type:
Reply or None
- send_exception(uid)[source]
Context manager that handles exceptions by sending them back to the
uid
.- Parameters:
uid (str) – Remote UID.
- send_pubsub(*args, **kwargs)[source]
Synchronously send message to remote runtime.
For arguments and return values check
Comms.send_async
.
- async send_pubsub_async(*args, **kwargs)[source]
Send message to
sender_id
with given arguments and keyword arguments.- Parameters:
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Depending on whether a reply is expected or not.
- Return type:
Reply or None
- send_recv(*args, **kwargs)[source]
Synchronously send message to remote runtime and wait for reply.
For arguments and return values check
Comms.send_async
.
- async send_recv_async(send_uid, *args, **kwargs)[source]
Send message to
sender_id
with given arguments and keyword arguments, and then wait for the reply.- Parameters:
send_uid (str) – UID of the remote runtime.
args (tuple, optional) – Any arguments for the message.
kwargs (optional) – Keyword arguments for the method.
- Returns:
Result of the reply
- Return type:
object
- async shake(sender_id, network)[source]
Handle confirmation of complete handshake.
- Parameters:
sender_id (str) – Remote UID.
network (dict) – Existing topology of connected sockets.
- shaken(uid)[source]
Check whether remote UID has completed handshake.
- Parameters:
uid (str) – Remote UID.
- uid_address(uid)[source]
Find remote address given UID.
- Parameters:
uid (str) – Remote UID.
- Returns:
Address.
- Return type:
str