kivy_oscpy
Ceci est une ancienne révision du document !
Table des matières
Kivy: oscpy
A modern implementation of OSC for python 3
C'est maintenu par les développeurs Kivy, c'est bien fait.
Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment.
Ressources
Installation
sudo pip3 install oscpy
Exemple tiré de
Extraits
Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un serveur pour recevoir et un client pour envoyer.
J'ai écrit quelque part que l'OSC ne supporte pas l'UTF-8. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec
"à@éèù".encode('utf-8')
et de décoder avec
data.decode('utf-8')
main.py
class Accelerometer(BoxLayout): def __init__(self, app): super().__init__() self.app = app Clock.schedule_interval(self.update_display, 1/50) ... def update_display(self, dt): root = self.app.get_running_app() self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}" def on_activity(self, act): """Appelé par on_release: root.on_activity(3) dans accelerometer.kv""" r = self.app.get_running_app() r.client.send_message(b'/activity', [act]) class AccelerometerApp(App): def build(self): ... self.server = OSCThreadServer() self.server.listen( address=b'localhost', port=3003, default=True) ... self.server.bind(b'/sensor', self.on_sensor) self.client = OSCClient(b'localhost', 3001) def on_sensor(self, sens): self.sensor = sens.decode('utf-8')
service.py
... from oscpy.client import OSCClient from oscpy.server import OSCThreadServer ... class AccelerometerService: ... def init_osc(self): """Le serveur peut envoyer mais impossible d'avoir 2 serveurs sur le même port. """ self.server = OSCThreadServer() self.server.listen('localhost', port=3001, default=True) # Les callbacks du serveur self.server.bind(b'/activity', self.on_activity) self.server.bind(b'/stop', self.on_stop) self.server.bind(b'/sensor_enable', self.on_sensor_enable) # Un simple client self.client = OSCClient(b'localhost', 3003) ... def on_activity(self, msg): print("activity", msg) self.activity = int(msg) ... def get_acceleration(self): if self.status: a, b, c = 0,0,0 ... if self.sensor_enabled != 0: # Set dans les arrays self.acc_x[self.num] = a self.acc_y[self.num] = b self.acc_z[self.num] = c self.acc_act[self.num] = self.activity acc_message = [a, b, c, self.activity, self.num] self.client.send_message(b'/acc', acc_message) ... def run(self): while self.loop: self.get_acceleration() sleep(0.02) if __name__ == '__main__': ACCELEROMETER = AccelerometerService() ACCELEROMETER.run()
Bundle
Envoi d'un bundle avec un Client
- osv.py
from oscpy.client import OSCClient class OscClient: def __init__(self, **kwargs): self.ip = kwargs.get('ip', None) self.port = kwargs.get('port', None) self.client = OSCClient(self.ip, self.port) def send_depth(self, depth): self.client.send_message(b'/depth', [depth]) def send_bundle(self, messages): bund = [] for i, msg in enumerate(messages): tag = ('/' + str(i)).encode('utf-8') print(tag, msg) bund.append([tag, msg]) self.client.send_bundle(bund) if __name__ == "__main__": messages = [[3.2, 1, 1], [55.6, 1, 0]] cli = OscClient(**{'ip': '127.0.0.1', 'port': 8003}) cli.send_bundle(messages)
python3 osc.py b'/0' [3.2, 1, 1] b'/1' [55.6, 1, 0]
Réception d'un bundle sur un Server
from time import sleep from oscpy.server import OSCThreadServer def on_default(*args): print(args) def default_handler(*args): print("default_handler", args) server = OSCThreadServer() server.listen(b'localhost', port=8003, default=True) server.default_handler = default_handler while 1: sleep(0.1)
python3 osc_server_test.py default_handler (b'/0', 3.200000047683716, 1, 1) default_handler (b'/1', 55.599998474121094, 1, 0)
kivy_oscpy.1628146339.txt.gz · Dernière modification : 2021/08/05 06:52 de serge