A modern implementation of OSC for python 3
C'est maintenu par les développeurs Kivy, c'est bien fait.
Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment.
sudo pip3 install oscpy
Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un serveur pour recevoir et un client pour envoyer.
J'ai écrit quelque part que l'OSC ne supporte pas l'UTF-8. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec
"à@éèù".encode('utf-8')
et de décoder avec
data.decode('utf-8')
class Accelerometer(BoxLayout): def __init__(self, app): super().__init__() self.app = app Clock.schedule_interval(self.update_display, 1/50) ... def update_display(self, dt): root = self.app.get_running_app() self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}" def on_activity(self, act): """Appelé par on_release: root.on_activity(3) dans accelerometer.kv""" r = self.app.get_running_app() r.client.send_message(b'/activity', [act]) class AccelerometerApp(App): def build(self): ... self.server = OSCThreadServer() self.server.listen( address=b'localhost', port=3003, default=True) ... self.server.bind(b'/sensor', self.on_sensor) self.client = OSCClient(b'localhost', 3001) def on_sensor(self, sens): self.sensor = sens.decode('utf-8')
... from oscpy.client import OSCClient from oscpy.server import OSCThreadServer ... class AccelerometerService: ... def init_osc(self): """Le serveur peut envoyer mais impossible d'avoir 2 serveurs sur le même port. """ self.server = OSCThreadServer() self.server.listen('localhost', port=3001, default=True) # Les callbacks du serveur self.server.bind(b'/activity', self.on_activity) self.server.bind(b'/stop', self.on_stop) self.server.bind(b'/sensor_enable', self.on_sensor_enable) # Un simple client self.client = OSCClient(b'localhost', 3003) ... def on_activity(self, msg): print("activity", msg) self.activity = int(msg) ... def get_acceleration(self): if self.status: a, b, c = 0,0,0 ... if self.sensor_enabled != 0: # Set dans les arrays self.acc_x[self.num] = a self.acc_y[self.num] = b self.acc_z[self.num] = c self.acc_act[self.num] = self.activity acc_message = [a, b, c, self.activity, self.num] self.client.send_message(b'/acc', acc_message) ... def run(self): while self.loop: self.get_acceleration() sleep(0.02) if __name__ == '__main__': ACCELEROMETER = AccelerometerService() ACCELEROMETER.run()
from oscpy.client import OSCClient class OscClient: def __init__(self, **kwargs): self.ip = kwargs.get('ip', None) self.port = kwargs.get('port', None) self.client = OSCClient(self.ip, self.port) def send_depth(self, depth): self.client.send_message(b'/depth', [depth]) def send_bundle(self, messages): bund = [] for i, msg in enumerate(messages): tag = ('/' + str(i)).encode('utf-8') print(tag, msg) bund.append([tag, msg]) self.client.send_bundle(bund) if __name__ == "__main__": messages = [[3.2, 3, 4], [55.6, 12, 80]] cli = OscClient(**{'ip': '127.0.0.1', 'port': 8003}) cli.send_bundle(messages)
b'/0' [3.2, 3, 4] b'/1' [55.6, 12, 80]
from time import sleep from oscpy.server import OSCThreadServer dico = {} def on_tag(*args): print(args) tag = int(args[0].decode('utf-8')[1:]) print(tag) dico[tag] = args[1:] print(dico) def default_handler(*args): print("default_handler", args) server = OSCThreadServer() server.listen(b'localhost', port=8003, default=True) server.default_handler = default_handler for i in range(10): tag = ('/' + str(i)).encode('utf-8') server.bind(tag, on_tag, get_address=True) while 1: sleep(0.1)
python3 osc_server_test.py (b'/0', 3.200000047683716, 3, 4) 0 {0: (3.200000047683716, 3, 4)} (b'/1', 55.599998474121094, 12, 80) 1 {0: (3.200000047683716, 3, 4), 1: (55.599998474121094, 12, 80)}