diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index b7ac9fbd4..4616efb0b 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -28,3 +28,6 @@ Jan Goeteyn "ykzheng" Lear Corporation Nick Black +Michael Ohtake +Mark Ciechanowski +DG Technologies, Inc. \ No newline at end of file diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index 1fa3f5b4b..04cd8082f 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -24,7 +24,8 @@ "canalystii": ("can.interfaces.canalystii", "CANalystIIBus"), "systec": ("can.interfaces.systec", "UcanBus"), "seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"), - "cantact": ("can.interfaces.cantact", "CantactBus"), + "cantact": ("can.interfaces.cantact", "CantactBus"), + "dg": ("can.interfaces.dg", "DGBus"), } BACKENDS.update( diff --git a/can/interfaces/dg/__init__.py b/can/interfaces/dg/__init__.py new file mode 100644 index 000000000..8d546393d --- /dev/null +++ b/can/interfaces/dg/__init__.py @@ -0,0 +1 @@ +from can.interfaces.dg.dg import DGBus diff --git a/can/interfaces/dg/dg.py b/can/interfaces/dg/dg.py new file mode 100644 index 000000000..4a9e647f5 --- /dev/null +++ b/can/interfaces/dg/dg.py @@ -0,0 +1,456 @@ +#!/usr/bin/python +# +# ---------------------------------------------------------------------- +# pylint: disable=invalid-name +# ---------------------------------------------------------------------- +# +# ********************************************************************** +# +# File Name: dg.py +# Author(s): mohtake +# Target Project: python-can +# Description: +# Notes: +# ********************************************************************** +# + +"""DG BEACON python-can module""" + +# ---------------------------------------------------------------------- +import weakref +from can import BusABC, Message +from can.interfaces.dg.dg_gryphon_protocol import server_commands +from can.broadcastmanager import ( + LimitedDurationCyclicSendTaskABC, + ModifiableCyclicTaskABC, +) + + +class DGBus(BusABC): + """ Beacon python-can backend API """ + + def __init__(self, channel=1, **kwargs): + """ + :param int channel: + Channel which the bus will be initialized on. + + :param bool is_fd: + Specifies if bus is CANFD or not. + + :param str ip: + IP at which the DG device is connected. + + :param int bitrate: + Bitrate for the bus. + + :param bool termination: + Termination for the bus (on or off). + + :param int data_bitrate: + The data phase bitrate for the bus (Optional, only if is_fd is + True). + + :param bool pre_iso: + Specifies if bus should be CANFD PREISO (Optional, only if is_fd is + True). + + :return: None + """ + self.ip = kwargs["ip"] if "ip" in kwargs else "localhost" + self.is_fd = kwargs["is_fd"] if "is_fd" in kwargs else False + self.bitrate = kwargs["bitrate"] if "bitrate" in kwargs else 500000 + self.termination = kwargs["termination"] if "termination" in kwargs else True + self.pre_iso = kwargs["pre_iso"] if "pre_iso" in kwargs else False + self.data_bitrate = ( + kwargs["data_bitrate"] if "data_bitrate" in kwargs else 2000000 + ) + self.channel = channel + + self.beacon = server_commands.Gryphon(self.ip) + self.beacon.CMD_SERVER_REG("root", "dgbeacon") + self.beacon.CMD_BCAST_ON() + if not self.is_fd: + self.mode = "CAN" + temp_mode = [0] + elif self.pre_iso: + self.mode = "CANPREISO" + temp_mode = [2] + else: + self.mode = "CANFD" + temp_mode = [1] + temp_databitr = DGBus._int_to_list(self.data_bitrate) + temp_databitr.reverse() + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GSETFASTBITRATE, data_in=temp_databitr + ) + + temp_bitr = DGBus._int_to_list(self.bitrate) + temp_bitr.reverse() + temp_term = [1] if self.termination else [0] + + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_SETINTTERM, data_in=temp_term + ) + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GSETBITRATE, data_in=temp_bitr + ) + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GCANSETMODE, data_in=temp_mode + ) + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + self.beacon.CMD_CARD_SET_FILTER_MODE( + self.channel, self.beacon.FILTER_OFF_PASS_ALL + ) + self.channel_info = ("dg channel '%s' on " + self.mode) % self.channel + super(DGBus, self).__init__(self.channel, None) + + @staticmethod + def _int_to_list(num): + """ + Internal method to take a decimal int and convert to lst of int bytes + + :param int num: + Number (decimal) to be converted + :return: + list of bytes (MSB to LSB) + :rtype: + list + """ + hdrTemp = hex(num).rstrip("L")[2:] + if len(hdrTemp) % 2 != 0: + hdrTemp = "0" + hdrTemp + return [int(hdrTemp[i : i + 2], 16) for i in range(0, len(hdrTemp), 2)] + + @staticmethod + def _list_to_int(datab): + """ + Internal method to take a list of int bytes and convert to a decimal + int + + :param list datab: + List of bytes to be converted (MSB to LSB) + :return: + Decimal int corresponding to list + :rtype: + int + """ + header = "0x" + for item in datab: + if item < 16: + header = header + "0" + hex(item)[2:] + else: + header = header + hex(item)[2:] + return int(header, 16) + + @classmethod + def msg_to_dict(cls, msg): + """ + Method to convert Message type to gryph dict + + :param can.Message msg: + Message object to be converted + :return: + Gryphon compatible dictionary + :rtype: + dict + + Conversion table:: + + float msg.timestamp -> msgForm["timestamp"] (N/A) + bool msg.is_remote_frame -> N/A + bool msg.is_extended_id -> msgForm["hdrlen"] + bool msg.is_error_frame -> N/A + int msg.arbitration_id -> msgForm["hdr"], + msgForm["hdrlen"] + int msg.dlc -> N/A + bytearray msg.data -> msgForm["data"] + bool msg.is_fd -> N/A + bool msg.bitrate_switch -> N/A + bool msg.error_state_indicator-> N/A + int msg.channel -> N/A + """ + msgForm = {} + msgForm["hdr"] = cls._int_to_list(msg.arbitration_id) + msgForm["hdrlen"] = len(msgForm["hdr"]) + msgForm["data"] = msg.data + return msgForm + + @classmethod + def _dict_to_msg(cls, msgForm): + """ + Method to encode gryph dict to Message + + :param dict msgForm: + Dictionary to be converted + :return: + Message object + :rtype: + can.Message + """ + timestamp = msgForm["GCprotocol"]["body"]["data"]["timestamp"] / 1000000.0 + headerForm = cls._list_to_int(msgForm["GCprotocol"]["body"]["data"]["hdr"]) + data = msgForm["GCprotocol"]["body"]["data"]["data"] + extId = msgForm["GCprotocol"]["body"]["data"]["hdrlen"] == 8 + isFD = msgForm["GCprotocol"]["body"]["data"]["status"] == 48 + return Message( + timestamp=timestamp, + arbitration_id=headerForm, + data=data, + is_extended_id=extId, + is_fd=isFD, + ) + + def send(self, msg, timeout=None): + """send a can message + """ + msgForm = self.msg_to_dict(msg) + self.beacon.FT_DATA_TX(self.channel, msgForm) + + def _send_periodic_internal(self, msg, period, duration=None): + """send a periodic can message + """ + msgForm = {} + msgForm["message_list"] = [self.msg_to_dict(msg[0])] + msgForm["message_list"][0]["tx_period"] = int(period * 1000000) + msgForm["message_list"][0]["period_in_microsec"] = True + if duration is None: + cycles = 4294967295 + else: + cycles = int(duration / period) + reply = self.beacon.CMD_SCHED_TX(self.channel, msgForm, iterations=cycles) + return Scheduling( + msg, + period, + duration, + reply["schedule_id"], + weakref.ref(self.beacon), + self.channel, + ) + + def _recv_internal(self, timeout=None): + """wait for a received can message from BEACON, + or timeout + """ + if timeout is None: + reply = self.beacon.FT_DATA_WAIT_FOR_RX(timeout=0.5) + while reply is None: + reply = self.beacon.FT_DATA_WAIT_FOR_RX(timeout=0.5) + return self._dict_to_msg(reply), True + + reply = self.beacon.FT_DATA_WAIT_FOR_RX(timeout=timeout) + if reply is not None: + return self._dict_to_msg(reply), True + return None, True + + def _apply_filters(self, filters): + """set the filters on the BEACON + """ + if filters is None: + reply = self.beacon.CMD_CARD_GET_FILTER_HANDLES(self.channel) + for item in reply["GCprotocol"]["body"]["data"]["filter_handles"]: + self.beacon.CMD_CARD_MODIFY_FILTER( + self.channel, self.beacon.DELETE_FILTER, filter_handle=item + ) + self.beacon.CMD_CARD_SET_DEFAULT_FILTER( + self.channel, self.beacon.DEFAULT_FILTER_PASS + ) + return + dataFil = {} + counter = 0 + dataFil["flags"] = ( + self.beacon.FILTER_FLAG_PASS + | self.beacon.FILTER_FLAG_ACTIVE + | self.beacon.FILTER_FLAG_OR_BLOCKS + | self.beacon.FILTER_FLAG_SAMPLING_INACTIVE + ) + dataFil["filter_blocks"] = [] + for item in filters: + dataFil["filter_blocks"].append({}) + dataFil["filter_blocks"][counter]["byte_offset"] = 0 + dataFil["filter_blocks"][counter][ + "data_type" + ] = self.beacon.FILTER_DATA_TYPE_HEADER + dataFil["filter_blocks"][counter]["operator"] = self.beacon.BIT_FIELD_CHECK + dataFil["filter_blocks"][counter]["mask"] = self._int_to_list( + item["can_mask"] + ) + dataFil["filter_blocks"][counter]["pattern"] = self._int_to_list( + item["can_id"] + ) + counter += 1 + self.beacon.CMD_CARD_ADD_FILTER(self.channel, dataFil) + self.beacon.CMD_CARD_SET_FILTER_MODE(self.channel, self.beacon.FILTER_ON) + self.beacon.CMD_CARD_SET_DEFAULT_FILTER( + self.channel, self.beacon.DEFAULT_FILTER_BLOCK + ) + + def shutdown(self): + """stop the BEACON + """ + del self.beacon + + def flush_tx_buffer(self): + """re-initialize the can channel + """ + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + + def detect_channel_config(self, channel): + """ + Detect the current configuration of a specified channel + + :param int channel: + The channel to detect the config of + :return: + A dict describing the config of the channel + :rtype: + dict + """ + temp = {} + modeInt = self.beacon.CMD_CARD_IOCTL( + channel, self.beacon.IOCTL_GCANGETMODE, data_in=[0] + ) + modeInt = modeInt["GCprotocol"]["body"]["data"]["ioctl_data"][0] + if modeInt == 0: + is_fd = False + elif modeInt == 1: + is_fd = True + pre_iso = False + else: + is_fd = True + pre_iso = True + + bitrArr = self.beacon.CMD_CARD_IOCTL( + channel, self.beacon.IOCTL_GGETBITRATE, data_in=[0, 0, 0, 0] + ) + bitrArr = bitrArr["GCprotocol"]["body"]["data"]["ioctl_data"] + bitrArr.reverse() + bitr = DGBus._list_to_int(bitrArr) + + termInt = self.beacon.CMD_CARD_IOCTL( + channel, self.beacon.IOCTL_GETINTTERM, data_in=[0] + ) + termInt = termInt["GCprotocol"]["body"]["data"]["ioctl_data"][0] + term = termInt == 1 + + if is_fd: + dataArr = self.beacon.CMD_CARD_IOCTL( + channel, self.beacon.IOCTL_GGETFASTBITRATE, data_in=[0, 0, 0, 0] + ) + dataArr = dataArr["GCprotocol"]["body"]["data"]["ioctl_data"] + dataArr.reverse() + data = DGBus._list_to_int(dataArr) + temp["data_bitrate"] = data + temp["pre_iso"] = pre_iso + + temp["channel"] = channel + temp["is_fd"] = is_fd + temp["ip"] = self.ip + temp["bitrate"] = bitr + temp["termination"] = term + return temp + + @staticmethod + def _detect_available_configs(): + """ Returns list of dicts that contains available configs for the + beacon + """ + return [{"interface": "dg", "channel": channel} for channel in range(1, 9)] + + # + # METHODS SPECIFIC TO DG INTERFACE + # + def set_mode(self, new_mode): + """ + When going from CAN to CANFD or CANPREISO, this method will default + to the previously set bitrate. Call :meth:`set_databitr` to set bitrate + before switching modes. + + :param str new_mode: + The new mode for the bus, either "CAN", "CANFD", or "CANPREISO + """ + self.mode = new_mode + if new_mode == "CAN": + temp_mode = [0] + self.is_fd = False + self.pre_iso = False + elif new_mode == "CANFD": + temp_mode = [1] + self.is_fd = True + self.pre_iso = False + else: + temp_mode = [2] + self.is_fd = True + self.pre_iso = False + + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GCANSETMODE, data_in=temp_mode + ) + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + + def set_bitrate(self, new_bitr): + """ + :param int new_bitr: + The new bitrate + """ + self.bitrate = new_bitr + temp_bitrate = DGBus._int_to_list(new_bitr) + temp_bitrate.reverse() + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GSETBITRATE, data_in=temp_bitrate + ) + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + + def set_termination(self, new_term): + """ + :param bool new_term: + Set termination to on (True) or off (False) + """ + self.termination = new_term + temp_term = [1] if new_term else [0] + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_SETINTTERM, data_in=temp_term + ) + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + + def set_databitr(self, new_databitr): + """ + :param int new_databitr: + The new data phase bitrate + """ + self.data_bitrate = new_databitr + temp_databitr = DGBus._int_to_list(new_databitr) + temp_databitr.reverse() + self.beacon.CMD_CARD_IOCTL( + self.channel, self.beacon.IOCTL_GSETFASTBITRATE, data_in=temp_databitr + ) + self.beacon.CMD_INIT(self.channel, value_in=self.beacon.ALWAYS_INIT) + + def set_event_rx(self, state): + """ + :param bool state: + Set event enabling to on (True) or off (False) + """ + if state: + self.beacon.CMD_EVENT_ENABLE(self.channel) + else: + self.beacon.CMD_EVENT_DISABLE(self.channel, 0) + + +class Scheduling(LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC): + """ Class that handles task interface between Beacon scheduler and + python-can + """ + + def __init__(self, message, period, duration, idIn, _beacon, channel): + super(Scheduling, self).__init__(message, period, duration) + self.idIn = idIn + self.beaconref = _beacon + self.channel = channel + + def modify_data(self, message): + self.beaconref().CMD_SCHED_MSG_REPLACE( + self.idIn, DGBus.msg_to_dict(message), index=0 + ) + + def stop(self): + self.beaconref().CMD_SCHED_KILL_TX(self.channel, self.idIn) diff --git a/doc/interfaces.rst b/doc/interfaces.rst index bd7a0d1df..4cfe507e7 100644 --- a/doc/interfaces.rst +++ b/doc/interfaces.rst @@ -26,6 +26,7 @@ The available interfaces are: interfaces/virtual interfaces/canalystii interfaces/systec + interfaces/dg interfaces/seeedstudio Additional interfaces can be added via a plugin interface. An external package diff --git a/doc/interfaces/dg.rst b/doc/interfaces/dg.rst new file mode 100644 index 000000000..e2ebadac2 --- /dev/null +++ b/doc/interfaces/dg.rst @@ -0,0 +1,23 @@ +DG Interface +================ + +Interface to `DG Technologies `__ Beacon + + +Bus +--- + +.. autoclass:: can.interfaces.dg.dg.DGBus + + +Capability +------------- +The ``DGBus`` class implements several python-can methods:: + + send, send_periodic, recv, set_filters, flush_tx_buffer, _detect_available_configs, shutdown + +The ``Scheduling`` class supports both ``Limited Duration Tasks`` and ``Modifiable Tasks`` + +The Bus also implements several Bus-Specific methods:: + + set_mode, set_bitrate, set_databitr, set_event_rx diff --git a/test/test_dg.py b/test/test_dg.py new file mode 100644 index 000000000..fa900b067 --- /dev/null +++ b/test/test_dg.py @@ -0,0 +1,613 @@ +#!/usr/bin/python +# +# ---------------------------------------------------------------------- +# pylint: disable=invalid-name +# ---------------------------------------------------------------------- +# +# ********************************************************************** +# File Name: test_dg.py +# Author(s): mohtake +# Target Project: python-can +# Description: +# Notes: +# ********************************************************************** +# + +"""DG python-can unittest module""" + +import time + +try: + from queue import Queue +except ImportError: + from Queue import Queue +import threading +from threading import Thread +import unittest + +try: + from unittest.mock import Mock +except ImportError: + from mock import Mock + +# ---------------------------------------------------------------------- +import can +from can import Message +from can.interfaces.dg.dg_gryphon_protocol import server_commands + + +class mockedBus: + """ Mocked Bus for testing DG Interface""" + + def __init__(self): + """Basic init for each test, sets up queue, mock, threads, etc.""" + self.q = Queue() + self.threads = [] + self.events = [] + self.filters = [] + self.filtOn = False + self.filtReply = { + "GCprotocol": {"body": {"data": {"filter_handles": ["dummy"]}}} + } + self.ioctlReply = { + "GCprotocol": {"body": {"data": {"ioctl_data": [0x01, 0x03, 0xFF]}}} + } + server_commands.Gryphon.__init__ = Mock(return_value=None) + server_commands.Gryphon.__del__ = Mock(return_value=None) + server_commands.Gryphon.CMD_SERVER_REG = Mock() + server_commands.Gryphon.FT_DATA_TX = Mock(side_effect=self.add_queue) + server_commands.Gryphon.CMD_BCAST_ON = Mock() + server_commands.Gryphon.CMD_CARD_IOCTL = Mock(return_value=self.ioctlReply) + server_commands.Gryphon.CMD_INIT = Mock() + server_commands.Gryphon.CMD_SCHED_TX = Mock(side_effect=self.send_sched) + server_commands.Gryphon.FT_DATA_WAIT_FOR_RX = Mock(side_effect=self.get_queue) + server_commands.Gryphon.CMD_CARD_GET_FILTER_HANDLES = Mock( + return_value=self.filtReply + ) + server_commands.Gryphon.CMD_CARD_MODIFY_FILTER = Mock() + server_commands.Gryphon.CMD_CARD_SET_DEFAULT_FILTER = Mock( + side_effect=self.del_filt + ) + server_commands.Gryphon.CMD_CARD_ADD_FILTER = Mock(side_effect=self.set_filt) + server_commands.Gryphon.CMD_CARD_SET_FILTER_MODE = Mock( + side_effect=self.filt_mode + ) + server_commands.Gryphon.CMD_SCHED_MSG_REPLACE = Mock( + side_effect=self.channelge_sched + ) + server_commands.Gryphon.CMD_SCHED_KILL_TX = Mock(side_effect=self.kill_sched) + server_commands.Gryphon.CMD_EVENT_ENABLE = Mock() + server_commands.Gryphon.CMD_EVENT_DISABLE = Mock() + + def filt_mode(self, *args): + """Set the filter mode to on or off""" + if args[1] == 5: + self.filtOn = True + else: + self.filtOn = False + + def del_filt(self, *args): + """Delete a filter if SET_DEFAULT_FILTER is called""" + if args[1] == 1: + self.filters = [] + self.filtOn = False + + def set_filt(self, *args): + """Setup a filter""" + filts = args[1]["filter_blocks"] + for i in range(0, len(filts)): + self.filters.append(filts[i]["pattern"]) + + def filt_check(self, hdr): + """Make filter check before sending to queue""" + if hdr in self.filters: + return True + return False + + def add_queue(self, *args): + """Add a message to the message queue""" + if not self.filtOn or self.filt_check(args[1]["hdr"]): + self.q.put(args[1]) + + def get_queue(self, **kwargs): + """Get a message from the message queue, if there are any""" + try: + x = self.q.get(timeout=kwargs["timeout"]) + except: + return None + rxReply = {"GCprotocol": {"body": {"data": x}}} + rxReply["GCprotocol"]["body"]["data"]["timestamp"] = 100000 + rxReply["GCprotocol"]["body"]["data"]["status"] = 20 + return rxReply + + def send_msg_thread(self, msg, wait, iterations, tEv): + """Thread for sending a periodic message""" + wait = wait / 1000000.0 + if tEv is not None: + while not tEv.is_set(): + self.add_queue(None, msg) + time.sleep(wait) + else: + for _ in range(0, iterations): + self.add_queue(None, msg) + time.sleep(wait) + + def send_sched(self, *args, **kwargs): + """Setup up the scheduler message""" + txMsg = args[1]["message_list"][0] + wait = txMsg["tx_period"] + txMsg.pop("tx_period") + txMsg.pop("period_in_microsec") + tEv = threading.Event() + t = Thread( + target=self.send_msg_thread, args=(txMsg, wait, kwargs["iterations"], tEv) + ) + t.daemon = True + self.threads.append(t) + self.events.append(tEv) + t.start() + return {"schedule_id": len(self.threads)} + + def channelge_sched(self): + """Not currently implemented""" + raise NotImplementedError + + def kill_sched(self, *args): + """Kill a schedule and wait for join""" + self.events[args[1] - 1].set() + self.threads[args[1] - 1].join() + + +class test_dg(unittest.TestCase): + """ Tests dg interface""" + + def setUp(self): + """Setup mockedBus for each test""" + # IF YOU WANT TO RUN THE ACTUAL TESTS WITHOUT THE MOCKED BUS, + # MEANING YOU HAVE A BEACON CONNECTED AT SOME IP ADDRESS, THEN + # COMMENT OUT THE NEXT LINE (mockedBus()) AND ENTER THE IP + # INTO self.ip + mockedBus() + self.ip = "localhost" + + def test_filter_schedule(self): + """ + REQUIRES: shutdown, task.stop_all_periodic_tasks + TESTS: _apply_filters(filter), _send_periodic_internal() + """ + print("\ntest_filter_schedule") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x05EA, data=[12, 255, 29, 152]) + bus.send_periodic(msg, 0.1) + filt = [{"can_id": 0x054A, "can_mask": 0xFFFF, "extended": True}] + time.sleep(1) + bus.set_filters(filt) + for _ in range(0, 50): + bus.recv(timeout=0) + reply = bus.recv(timeout=0) + self.assertEqual(reply, None) + finally: + bus.stop_all_periodic_tasks(remove_tasks=False) + bus.shutdown() + + def test_RX_TX(self): + """ + REQUIRES: shutdown + TESTS: _recv_internal, send + """ + print("\ntest_RX_TX") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x05EA, data=[12, 255, 29, 152]) + bus.send(msg) + reply = bus.recv() + self.assertEqual(reply.arbitration_id, 0x05EA) + self.assertEqual(reply.data, bytearray([12, 255, 29, 152])) + finally: + bus.shutdown() + + def test_configs(self): + """ + REQUIRES: shutdown + TESTS: _detect_available_configs + """ + print("\ntest_configs") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + conf = can.interfaces.dg.dg.DGBus._detect_available_configs() + for item in conf: + self.assertTrue(isinstance(item["channel"], int)) + finally: + bus.shutdown() + + def test_flush(self): + """ + REQUIRES: shutdown + TESTS: flush_tx_buffer + """ + print("\ntest_flush") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.flush_tx_buffer() + self.assertTrue(True) + finally: + bus.shutdown() + + def test_filter_simple(self): + """ + REQUIRES: shutdown, _recv_internal, send + TESTS: _apply_filters(filt) + """ + print("\ntest_filter_simple") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x05EA, data=[12, 255, 29, 152]) + filt = [{"can_id": 0x054A, "can_mask": 0xFFFF, "extended": True}] + bus.set_filters(filters=filt) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(None, reply) + finally: + bus.shutdown() + + def test_filter_shutoff(self): + """ + REQUIRES: shutdown, _recv_internal, send, _apply_filters(filt) + TESTS: _apply_filters() + """ + print("\ntest_filter_shutoff") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x01EF, data=[12, 255, 29, 152]) + filt = [{"can_id": 0x054A, "can_mask": 0xFFFF, "extended": True}] + bus.set_filters(filters=filt) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(None, reply) + bus.set_filters() + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x01EF) + finally: + bus.shutdown() + + def test_long_filter(self): + """ + REQUIRES: shutdown, _recv_internal, send + TESTS: _apply_filters(filt) + """ + print("\ntest_long_filter") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x0132, data=[12, 255, 29, 152]) + filt = [{"can_id": 0x01E00132, "can_mask": 0xFFFFFFFF, "extended": True}] + bus.set_filters(filters=filt) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(None, reply) + msg.arbitration_id = 0x01E00132 + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x01E00132) + msg.arbitration_id = 0x01E0A132 + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply, None) + time.sleep(0.5) + # Above is needed so filter doesn't mess with other tests + finally: + bus.shutdown() + + def test_add_and_clear_filter(self): + """ + REQUIRES: shutdown, _recv_internal, send + TESTS: _apply_filters(filt), _apply_filters() + """ + print("\ntest_add_and_clear_filter") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x01E2, data=[12, 255, 29, 182]) + msg2 = Message(arbitration_id=0x01E00132, data=[12, 255, 29, 152]) + msg3 = Message(arbitration_id=0x04EA, data=[12, 255, 29, 152]) + filt = [{"can_id": 0x01E00132, "can_mask": 0xFFFFFFFF, "extended": True}] + bus.set_filters(filters=filt) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply, None) + filt2 = [{"can_id": 0x01E2, "can_mask": 0xFFFF}] + bus.set_filters(filters=filt2) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x01E2) + bus.send(msg2) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x01E00132) + bus.set_filters() + bus.send(msg3) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x04EA) + finally: + bus.shutdown() + + def test_simple_sched(self): + """ + REQUIRES: shutdown, _recv_internal + TESTS: _send_periodic_internal(), task.stop + """ + print("\ntest_simple_sched") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x01E2, data=[12, 255, 29, 252]) + task = bus.send_periodic(msg, 0.5) + time.sleep(1.1) + task.stop() + for _ in range(0, 3): + reply = bus.recv(timeout=0) + self.assertEqual(reply.arbitration_id, 0x01E2) + finally: + bus.stop_all_periodic_tasks(remove_tasks=False) + bus.shutdown() + + # Not going to bother with modifying schedules at the moment + def _test_alter_sched(self): + """ + REQUIRES: shutdown, _recv_internal, _send_periodic_internal(), task.stop + TESTS: task.modify_data + """ + print("\ntest_alter_sched") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x01E2, data=[12, 255, 29, 160]) + task = bus.send_periodic(msg, 0.5) + time.sleep(1.6) + msg.data = [34, 13, 22, 1] + task.modify_data(msg) + time.sleep(1.6) + task.stop() + for _ in range(0, 4): + reply = bus.recv(timeout=0) + self.assertEqual(reply.arbitration_id, 0x01E2) + for _ in range(0, 3): + reply = bus.recv(timeout=0) + self.assertEqual(reply.data, bytearray([34, 13, 22, 1])) + reply = bus.recv(timeout=0) + self.assertEqual(reply, None) + finally: + bus.stop_all_periodic_tasks(remove_tasks=False) + bus.shutdown() + + def test_mult_sched(self): + """ + REQUIRES: shutdown, _recv_internal + TESTS: _send_periodic_internal(duration) + """ + print("\ntest_mult_sched") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg1 = Message(arbitration_id=0x01EE, data=[12, 255]) + msg2 = Message(arbitration_id=0x043EA209, data=[16, 211, 15]) + bus.send_periodic(msg1, 0.5, 3.2) + time.sleep(0.25) + bus.send_periodic(msg2, 0.5, 3.2) + for _ in range(0, 6): + reply = bus.recv(timeout=0.6) + self.assertEqual(reply.arbitration_id, 0x01EE) + reply = bus.recv(timeout=0.6) + self.assertEqual(reply.arbitration_id, 0x043EA209) + reply = bus.recv(timeout=0) + self.assertEqual(reply, None) + finally: + bus.shutdown() + + def test_stop_all_tasks(self): + """ + REQUIRES: shutdown, _recv_internal + TESTS: stop_all_periodic_tasks + """ + print("\ntest_stop_all_tasks") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg1 = Message(arbitration_id=0x01FF4332, data=[12, 255, 29, 152]) + msg2 = Message(arbitration_id=0x043EA209, data=[16, 211]) + msg3 = Message(arbitration_id=0x02090001, data=[16, 211, 15, 22]) + msg5 = Message(arbitration_id=0x0401, data=[16, 211, 15, 22]) + bus.send_periodic(msg1, 0.5, 3, store_task=True) + bus.send_periodic(msg2, 0.4, 6, store_task=True) + bus.send_periodic(msg3, 0.1, 4, store_task=True) + bus.send_periodic(msg5, 0.1, 4, store_task=True) + bus.stop_all_periodic_tasks(remove_tasks=False) + time.sleep(1) + for _ in range(1, 50): + bus.recv(timeout=0) + reply = bus.recv(timeout=0) + self.assertEqual(reply, None) + finally: + bus.stop_all_periodic_tasks(remove_tasks=False) + bus.shutdown() + + def test_TS_RX_TX(self): + """ + THREAD-SAFE + REQUIRES: shutdown + TESTS: send, _recv_internal + """ + print("\ntest_TS_RX_TX") + bus = can.ThreadSafeBus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg = Message(arbitration_id=0x05EA, data=[122, 122, 122, 122]) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x05EA) + self.assertEqual(reply.data, bytearray([122, 122, 122, 122])) + finally: + bus.shutdown() + + def test_CANFD_RX_TX(self): + """ + REQUIRES: shutdown + TESTS: send, _recv_internal + """ + print("\ntest_CANFD_RX_TX") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=True, ip=self.ip) + bus2 = can.interface.Bus(bustype="dg", channel=2, is_fd=True, ip=self.ip) + try: + msg = Message(arbitration_id=0x05EA, data=[122, 122, 122, 122]) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x05EA) + self.assertEqual(reply.data, bytearray([122, 122, 122, 122])) + finally: + bus.shutdown() + bus2.shutdown() + + def test_PREISO_RX_TX(self): + """ + REQUIRES: shutdown + TESTS: send, _recv_internal + """ + print("\ntest_PREISO_RX_TX") + bus = can.interface.Bus( + bustype="dg", channel=1, is_fd=True, ip=self.ip, pre_iso=True + ) + bus2 = can.interface.Bus( + bustype="dg", channel=2, is_fd=True, ip=self.ip, pre_iso=True + ) + try: + msg = Message(arbitration_id=0x05EA, data=[122, 122, 122, 122]) + bus.send(msg) + reply = bus.recv(timeout=1) + self.assertEqual(reply.arbitration_id, 0x05EA) + self.assertEqual(reply.data, bytearray([122, 122, 122, 122])) + finally: + bus.shutdown() + bus2.shutdown() + + def test_TS_stop_all_tasks(self): + """ + THREAD-SAFE + REQUIRES: shutdown, _recv_internal + TESTS: _send_periodic_internal, _send_periodic_internal(duration) + """ + print("\ntest_TS_stop_all_tasks") + bus = can.ThreadSafeBus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + msg1 = Message(arbitration_id=0x01FF4332, data=[12, 255, 29, 152]) + msg2 = Message(arbitration_id=0x043EA209, data=[16, 211]) + msg3 = Message(arbitration_id=0x02090001, data=[16, 211, 15, 22]) + msg4 = Message(arbitration_id=0x02090001, data=[16, 211, 15, 22]) + msg5 = Message(arbitration_id=0x0401, data=[16, 211, 15, 22]) + bus.send_periodic(msg1, 0.5, 3, store_task=True) + bus.send_periodic(msg2, 0.4, 6, store_task=True) + bus.send_periodic(msg3, 0.1, store_task=True) + bus.send_periodic(msg4, 0.1, 4, store_task=True) + bus.send_periodic(msg5, 0.1, 4, store_task=True) + bus.stop_all_periodic_tasks(remove_tasks=False) + time.sleep(1) + for _ in range(1, 50): + bus.recv(timeout=0) + reply = bus.recv(timeout=0) + self.assertEqual(reply, None) + finally: + bus.stop_all_periodic_tasks(remove_tasks=False) + bus.shutdown() + + def test_shutdown(self): + """ + REQUIRES: + TESTS: shutdown + """ + print("\ntest_shutdown") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + msg = Message(arbitration_id=0x01E2, data=[12, 255, 29, 112]) + bus.shutdown() + with self.assertRaises(AttributeError): + bus.send(msg) + + def test_set_mode(self): + """ + REQUIRES: + TESTS: set_mode + """ + print("\ntest_set_mode") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.set_mode("CANPREISO") + self.assertEqual(bus.mode, "CANPREISO") + bus.set_mode("CANFD") + self.assertEqual(bus.mode, "CANFD") + bus.set_mode("CAN") + self.assertEqual(bus.mode, "CAN") + finally: + bus.shutdown() + + def test_set_bitrate(self): + """ + REQUIRES: + TESTS: set_bitrate + """ + print("\ntest_set_bitrate") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.set_bitrate(300000) + self.assertEqual(bus.bitrate, 300000) + finally: + bus.shutdown() + + def test_set_termination(self): + """ + REQUIRES: + TESTS: set_termination + """ + print("\ntest_set_termination") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.set_termination(False) + self.assertEqual(bus.termination, False) + finally: + bus.shutdown() + + def test_set_databitr(self): + """ + REQUIRES: + TESTS: set_databitr + """ + print("\ntest_set_databitr") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=True, ip=self.ip) + try: + bus.set_databitr(1000000) + self.assertEqual(bus.data_bitrate, 1000000) + finally: + bus.shutdown() + + def test_event_rx(self): + """ + REQUIRES: + TESTS: set_event_rx + """ + print("\ntest_set_event_rx") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.set_event_rx(False) + bus.set_event_rx(True) + self.assertEqual(bus.mode, "CAN") + finally: + bus.shutdown() + + def test_dbit_mode(self): + """ + REQUIRES: + TESTS: set_databitr, set_mode + """ + print("\ntest_dbit_mode") + bus = can.interface.Bus(bustype="dg", channel=1, is_fd=False, ip=self.ip) + try: + bus.set_databitr(500000) + bus.set_mode("CANFD") + self.assertEqual(bus.data_bitrate, 500000) + finally: + bus.shutdown() + + +if __name__ == "__main__": + unittest.main()