Comms

class mosaic.comms.comms.CommsManager(runtime=None, address=None, port=None, context=None, loop=None)[source]

Bases: object

Objects of this type manage the connections and message passing between different runtimes.

Parameters:
  • runtime (Runtime, optional) – Current runtime, defaults to global runtime.

  • address (str) – IP address of the connection.

  • port (int) – Port to use for the connection.

  • context (zmq.Context, optional) – ZMQ socket context, defaults to global context.

  • loop (EventLoop, optional) – Event loop to use, defaults to global event loop.

property address

Connection address.

async beat(sender_id)[source]

Received beat message, the remote endpoint is alive.

Parameters:

sender_id (str) – Remote UID.

async call(sender_id, method, reply, **kwargs)[source]

Run method in the loop.

Parameters:
  • sender_id (str) – UID of the remote endpoint.

  • method (callable) – Method to execute

  • reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.

  • kwargs (optional) – Keyword arguments for the method.

async call_safe(sender_id, method, reply, **kwargs)[source]

Run method in the loop, and within an exception handler that will process exceptions and send them back to the sender.

Parameters:
  • sender_id (str) – UID of the remote endpoint.

  • method (callable) – Method to execute

  • reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.

  • kwargs (optional) – Keyword arguments for the method.

cmd(*args, **kwargs)[source]

Synchronously send command to remote runtime.

For arguments and return values check Comms.cmd_async.

async cmd_async(*args, **kwargs)[source]

Send command with given arguments and keyword arguments.

Parameters:
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Depending on whether a reply is expected or not.

Return type:

Reply or None

cmd_pubsub(*args, **kwargs)[source]

Synchronously send command to remote runtime.

For arguments and return values check Comms.cmd_async.

async cmd_pubsub_async(*args, **kwargs)[source]

Send command with given arguments and keyword arguments.

Parameters:
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Depending on whether a reply is expected or not.

Return type:

Reply or None

cmd_recv(*args, **kwargs)[source]

Synchronously send command to remote runtime and wait for reply.

For arguments and return values check Comms.cmd_async.

async cmd_recv_async(*args, **kwargs)[source]

Send command with given arguments and keyword arguments, and then wait for the reply.

Parameters:
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Result of the reply

Return type:

object

async connect(sender_id, uid, address, port, notify=False)[source]

Create and connect outbound connection for a remote runtime, with a given address and port.

Parameters:
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

  • notify (bool, optional) – Whether or not to notify others of a new connection, defaults to False.

connect_pubsub(uid, address, port)[source]

Create and connect pub-sub connection for a remote runtime, with a given address and port.

Parameters:
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

connect_recv()[source]

Connect inbound connection.

connect_send(uid, address, port)[source]

Create and connect outbound connection for a remote runtime, with a given address and port.

Parameters:
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

connected(uid)[source]

Check whether remote UID is connected.

Parameters:

uid (str) – Remote UID.

async disconnect(sender_id, uid, notify=False)[source]

Disconnect a remote endpoint.

Parameters:
  • sender_id (str) – Sender UID.

  • uid (str) – Remote UID to disconnect.

  • notify (bool, optional) – Whether or not to notify others of the disconnection, defaults to False.

disconnect_recv()[source]

Disconnect inbound connection.

disconnect_send()[source]

Connect all outbound connections.

async hand(sender_id, address, port)[source]

Handle incoming handshake.

Parameters:
  • sender_id (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

async handshake(uid, address, port, pubsub_port=None)[source]

Start handshake with remote uid, located at a certain address and port.

Parameters:
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

  • pubsub_port (int, optional) – Publishing port.

async heart(sender_id)[source]

Received heart message, respond with beat.

Parameters:

sender_id (str) – Remote UID.

listen()[source]

Start the listening loop.

Returns:

Future associated with the running loop.

Return type:

concurrent.futures.Future

async listen_async(recv_async)[source]

Asynchronous listening loop.

The loop waits on messages from the incoming connection, then processes them and, if necessary, passes them to the runtime.

property logger

Runtime logger.

property port

Connection port.

async process_msg(sender_id, msg)[source]

Process a received message to decide what to do with it.

Parameters:
  • sender_id (str) – UID of the remote endpoint.

  • msg (Message) – Message object.

property pubsub_port

Connection port.

recv(**kwargs)[source]

Synchronously receive message from remote runtime.

For arguments and return values check Comms.recv_async.

async recv_async()[source]

Wait for received message from the inbound socket.

Returns:

  • str – Sender UID.

  • Message – Received message.

recv_pubsub(**kwargs)[source]

Synchronously receive message from remote runtime.

For arguments and return values check Comms.recv_async.

async recv_pubsub_async()[source]

Wait for received message from the inbound pub-sub socket.

Returns:

  • str – Sender UID.

  • Message – Received message.

register_reply_future(future)[source]

Register a Reply to be accessible later on.

Parameters:

future (Reply)

reply(sender_id, uid, result)[source]

Process reply from remote runtime.

Parameters:
  • sender_id (str) – UID of the remote endpoint.

  • uid (str) – UID of the associated Reply.

  • result (object) – Result of the reply.

send(*args, **kwargs)[source]

Synchronously send message to remote runtime.

For arguments and return values check Comms.send_async.

async send_async(send_uid, *args, **kwargs)[source]

Send message to sender_id with given arguments and keyword arguments.

Parameters:
  • send_uid (str) – UID of the remote runtime.

  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Depending on whether a reply is expected or not.

Return type:

Reply or None

send_exception(uid)[source]

Context manager that handles exceptions by sending them back to the uid.

Parameters:

uid (str) – Remote UID.

send_pubsub(*args, **kwargs)[source]

Synchronously send message to remote runtime.

For arguments and return values check Comms.send_async.

async send_pubsub_async(*args, **kwargs)[source]

Send message to sender_id with given arguments and keyword arguments.

Parameters:
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Depending on whether a reply is expected or not.

Return type:

Reply or None

send_recv(*args, **kwargs)[source]

Synchronously send message to remote runtime and wait for reply.

For arguments and return values check Comms.send_async.

async send_recv_async(send_uid, *args, **kwargs)[source]

Send message to sender_id with given arguments and keyword arguments, and then wait for the reply.

Parameters:
  • send_uid (str) – UID of the remote runtime.

  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns:

Result of the reply

Return type:

object

async shake(sender_id, network)[source]

Handle confirmation of complete handshake.

Parameters:
  • sender_id (str) – Remote UID.

  • network (dict) – Existing topology of connected sockets.

shaken(uid)[source]

Check whether remote UID has completed handshake.

Parameters:

uid (str) – Remote UID.

start_heartbeat(uid)[source]

Start the heartbeat procedure.

Parameters:

uid (str) – Remote UID.

async stop(sender_id)[source]

Stop the CommsManager.

Parameters:

sender_id (str) – Remote UID.

uid_address(uid)[source]

Find remote address given UID.

Parameters:

uid (str) – Remote UID.

Returns:

Address.

Return type:

str

uid_port(uid)[source]

Find remote port given UID.

Parameters:

uid (str) – Remote UID.

Returns:

Port.

Return type:

int

wait()[source]

Wait until the listening loop of the comms is done.

async wait_for(uid)[source]

Wait until remote endpoint has connected.

Parameters:

uid (str) – Remote UID.