Comms

class mosaic.comms.comms.CommsManager(runtime=None, address=None, port=None, context=None, loop=None)[source]

Bases: object

Objects of this type manage the connections and message passing between different runtimes.

Parameters
  • runtime (Runtime, optional) – Current runtime, defaults to global runtime.

  • address (str) – IP address of the connection.

  • port (int) – Port to use for the connection.

  • context (zmq.Context, optional) – ZMQ socket context, defaults to global context.

  • loop (EventLoop, optional) – Event loop to use, defaults to global event loop.

property address

Connection address.

async beat(sender_id)[source]

Received beat message, the remote endpoint is alive.

Parameters

sender_id (str) – Remote UID.

async call(sender_id, method, reply, **kwargs)[source]

Run method in the loop.

Parameters
  • sender_id (str) – UID of the remote endpoint.

  • method (callable) – Method to execute

  • reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.

  • kwargs (optional) – Keyword arguments for the method.

async call_safe(sender_id, method, reply, **kwargs)[source]

Run method in the loop, and within an exception handler that will process exceptions and send them back to the sender.

Parameters
  • sender_id (str) – UID of the remote endpoint.

  • method (callable) – Method to execute

  • reply (False or str) – Whether a reply is needed and, if so, the UID of the reply.

  • kwargs (optional) – Keyword arguments for the method.

cmd(*args, **kwargs)[source]

Synchronously send command to remote runtime.

For arguments and return values check Comms.cmd_async.

async cmd_async(*args, **kwargs)[source]

Send command with given arguments and keyword arguments.

Parameters
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns

Depending on whether a reply is expected or not.

Return type

Reply or None

cmd_recv(*args, **kwargs)[source]

Synchronously send command to remote runtime and wait for reply.

For arguments and return values check Comms.cmd_async.

async cmd_recv_async(*args, **kwargs)[source]

Send command with given arguments and keyword arguments, and then wait for the reply.

Parameters
  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns

Result of the reply

Return type

object

async connect(sender_id, uid, address, port, notify=False)[source]

Create and connect outbound connection for a remote runtime, with a given address and port.

Parameters
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

  • notify (bool, optional) – Whether or not to notify others of a new connection, defaults to False.

connect_recv()[source]

Connect inbound connection.

connect_send(uid, address, port)[source]

Create and connect outbound connection for a remote runtime, with a given address and port.

Parameters
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

connected(uid)[source]

Check whether remote UID is connected.

Parameters

uid (str) – Remote UID.

async disconnect(sender_id, uid, notify=False)[source]

Disconnect a remote endpoint.

Parameters
  • sender_id (str) – Sender UID.

  • uid (str) – Remote UID to disconnect.

  • notify (bool, optional) – Whether or not to notify others of the disconnection, defaults to False.

disconnect_recv()[source]

Disconnect inbound connection.

disconnect_send()[source]

Connect all outbound connections.

async hand(sender_id, address, port)[source]

Handle incoming handshake.

Parameters
  • sender_id (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

async handshake(uid, address, port)[source]

Start handshake with remote uid, located at a certain address and port.

Parameters
  • uid (str) – Remote UID.

  • address (str) – Remote address.

  • port (int) – Remote port.

async heart(sender_id)[source]

Received heart message, respond with beat.

Parameters

sender_id (str) – Remote UID.

listen()[source]

Start the listening loop.

Returns

Future associated with the running loop.

Return type

concurrent.futures.Future

async listen_async()[source]

Asynchronous listening loop.

The loop waits on messages from the incoming connection, then processes them and, if necessary, passes them to the runtime.

property logger

Runtime logger.

property port

Connection port.

async process_msg(sender_id, msg)[source]

Process a received message to decide what to do with it.

Parameters
  • sender_id (str) – UID of the remote endpoint.

  • msg (Message) – Message object.

recv(**kwargs)[source]

Synchronously receive message from remote runtime.

For arguments and return values check Comms.recv_async.

async recv_async()[source]

Wait for received message from the inbound socket.

Returns

  • str – Sender UID.

  • Message – Received message.

register_reply_future(future)[source]

Register a Reply to be accessible later on.

Parameters

future (Reply) –

reply(sender_id, uid, result)[source]

Process reply from remote runtime.

Parameters
  • sender_id (str) – UID of the remote endpoint.

  • uid (str) – UID of the associated Reply.

  • result (object) – Result of the reply.

send(*args, **kwargs)[source]

Synchronously send message to remote runtime.

For arguments and return values check Comms.send_async.

async send_async(send_uid, *args, **kwargs)[source]

Send message to sender_id with given arguments and keyword arguments.

Parameters
  • send_uid (str) – UID of the remote runtime.

  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns

Depending on whether a reply is expected or not.

Return type

Reply or None

send_exception(uid)[source]

Context manager that handles exceptions by sending them back to the uid.

Parameters

uid (str) – Remote UID.

send_recv(*args, **kwargs)[source]

Synchronously send message to remote runtime and wait for reply.

For arguments and return values check Comms.send_async.

async send_recv_async(send_uid, *args, **kwargs)[source]

Send message to sender_id with given arguments and keyword arguments, and then wait for the reply.

Parameters
  • send_uid (str) – UID of the remote runtime.

  • args (tuple, optional) – Any arguments for the message.

  • kwargs (optional) – Keyword arguments for the method.

Returns

Result of the reply

Return type

object

async shake(sender_id, network)[source]

Handle confirmation of complete handshake.

Parameters
  • sender_id (str) – Remote UID.

  • network (dict) – Existing topology of connected sockets.

shaken(uid)[source]

Check whether remote UID has completed handshake.

Parameters

uid (str) – Remote UID.

async stop(sender_id)[source]

Stop the CommsManager.

Parameters

sender_id (str) – Remote UID.

uid_address(uid)[source]

Find remote address given UID.

Parameters

uid (str) – Remote UID.

Returns

Address.

Return type

str

uid_port(uid)[source]

Find remote port given UID.

Parameters

uid (str) – Remote UID.

Returns

Port.

Return type

int

wait()[source]

Wait until the listening loop of the comms is done.

async wait_for(uid)[source]

Wait until remote endpoint has connected.

Parameters

uid (str) – Remote UID.