Advanced communication protocols

Some devices require a more advanced communication protocol, e.g. due to checksums or device addresses. In most cases, it is sufficient to subclass Instrument.write and Instrument.read.

Instrument’s inner workings

In order to adjust an instrument for more complicated protocols, it is key to understand the different parts.

The Adapter exposes write() and read() for strings, write_bytes() and read_bytes() for bytes messages. These are the most basic methods, which log all the traffic going through them. For the actual communication, they call private methods of the Adapter in use, e.g. VISAAdapter._read. For binary data, like waveforms, the adapter provides also write_binary_values() and read_binary_values(), which use the aforementioned methods. You do not need to call all these methods directly, instead, you should use the methods of Instrument with the same name. They call the Adapter for you and keep the code tidy.

Now to Instrument. The most important methods are write() and read(), as they are the most basic building blocks for the communication. The pymeasure properties (Instrument.control and its derivatives Instrument.measurement and Instrument.setting) and probably most of your methods and properties will call them. In any instrument, write() should write a general string command to the device in such a way, that it understands it. Similarly, read() should return a string in a general fashion in order to process it further.

The getter of Instrument.control does not call them directly, but via a chain of methods. It calls values() which in turn calls ask() and processes the returned string into understandable values. ask() sends the readout command via write(), waits some time if necessary via wait_for(), and reads the device response via read().

Similarly, Instrument.binary_values sends a command via write(), waits with wait_till_read(), but reads the response via Adapter.read_binary_values.

Adding a device address and adding delay

Let’s look at a simple example for a device, which requires its address as the first three characters and returns the same style. This is straightforward, as write() just prepends the device address to the command, and read() has to strip it again doing some error checking. Similarly, a checksum could be added. Additionally, the device needs some time after it received a command, before it responds, therefore wait_for() waits always a certain time span.

class ExtremeCommunication(Instrument):
    """Control the ExtremeCommunication instrument.

    :param address: The device address for the communication.
    :param query_delay: Wait time after writing and before reading in seconds.
    """
    def __init__(self, adapter, name="ExtremeCommunication", address=0, query_delay=0.1):
        super().__init__(adapter, name)
        self.address = f"{address:03}"
        self.query_delay = query_delay

    def write(self, command):
        """Add the device address in front of every command before sending it."""
        super().write(self.address + command)

    def wait_for(self, query_delay=0):
        """Wait for some time.

        :param query_delay: override the global query_delay.
        """
        super().wait_for(query_delay or self.query_delay)

    def read(self):
        """Read from the device and check the response.

        Assert that the response starts with the device address.
        """
        got = super().read()
        if got.startswith(self.address):
            return got[3:]
        else:
            raise ConnectionError(f"Expected message address '{self.address}', but read '{got[3:]}' for wrong address '{got[:3]}'.")

    voltage = Instrument.measurement(
        ":VOLT:?", """Measure the voltage in Volts.""")

If the device is initialized with address=12, a request for the voltage would send "012:VOLT:?" to the device and expect a response beginning with "012".

Bytes communication

Some devices do not expect ASCII strings but raw bytes. In those cases, you can call the write_bytes() and read_bytes() in your write() and read() methods. The following example shows an instrument, which has registers to be written and read via bytes sent.

class ExtremeBytes(Instrument):
    """Control the ExtremeBytes instrument with byte-based communication."""
    def __init__(self, adapter, name="ExtremeBytes"):
        super().__init__(adapter, name)

    def write(self, command):
        """Write to the device according to the comma separated command.

        :param command: R or W for read or write, hexadecimal address, and data.
        """
        function, address, data = command.split(",")
        b = [0x03] if function == "R" else [0x10]
        b.extend(int(address, 16).to_bytes(2, byteorder="big"))
        b.extend(int(data).to_bytes(length=8, byteorder="big", signed=True))
        self.write_bytes(bytes(b))

    def read(self):
        """Read the response and return the data as a string, if applicable."""
        response = self.read_bytes(2)  # return type and payload
        if response[0] == 0x00:
            raise ConnectionError(f"Device error of type {response[1]} occurred.")
        if response[0] == 0x03:
            # read that many bytes and return them as an integer
            data = self.read_bytes(response[1])
            return str(int.from_bytes(data, byteorder="big", signed=True))
        if response[0] == 0x10 and response[1] != 0x00:
            raise ConnectionError(f"Writing to the device failed with error {response[1]}")

    voltage = Instrument.control(
        "R,0x106,1", "W,0x106,%i",
        """Control the output voltage in mV.""",
    )