Outils pour utilisateurs

Outils du site


kivy_oscpy

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
kivy_oscpy [2020/10/30 09:41] – Tag0 Added: kivy,protocole_reseau,python,sb sergekivy_oscpy [2021/08/05 07:30] (Version actuelle) – [Envoi d'un bundle avec un Client] serge
Ligne 1: Ligne 1:
-====== Kivy oscpy ======+====== Kivyoscpy ====== 
 +<WRAP center round box 60% centeralign> 
 +**{{tagpage>kivy|Toutes les pages Kivy}}** 
 +**[[http://translate.google.com/translate?hl=&sl=auto&tl=en&u=https%3A%2F%2Fressources.labomedia.org%2Fkivy_oscpy|English Version]]**
  
-{{tag>kivy protocole_reseau python sb}}+**[[les_pages_kivy_en_details|Les pages Kivy en détails]]** 
 +</WRAP> 
 +<WRAP center round box 60% centeralign> 
 +**A modern implementation of OSC for python 3**\\  
 +**C'est maintenu par les développeurs Kivy, c'est bien fait.** 
 +</WRAP> 
 + 
 +Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment.  
 + 
 +===== Ressources ===== 
 + 
 +  * https://github.com/kivy/oscpy 
 +  * https://pypi.org/project/oscpy/ 
 + 
 +===== Installation ===== 
 + 
 +  sudo pip3 install oscpy 
 + 
 +=====Exemple tiré de ===== 
 +  * **[[https://github.com/sergeLabo/accelerometer_service_osc|accelerometer_service_osc]]** 
 + 
 +====Extraits==== 
 +Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un  serveur pour recevoir et un client pour envoyer. 
 + 
 +J'ai écrit quelque part que l'OSC ne supporte pas l'**UTF-8**. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec<code python> "à@éèù".encode('utf-8')</code> et de décoder avec 
 +<code python>data.decode('utf-8')</code> 
 + 
 +===main.py=== 
 +[[https://github.com/sergeLabo/accelerometer_service_osc/blob/main/main.py|main.py]] 
 + 
 +<code python>     
 +class Accelerometer(BoxLayout): 
 + 
 +    def __init__(self, app): 
 +        super().__init__() 
 +        self.app = app 
 +        Clock.schedule_interval(self.update_display, 1/50) 
 +        ... 
 + 
 +    def update_display(self, dt): 
 +        root = self.app.get_running_app() 
 +        self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}" 
 +         
 +    def on_activity(self, act): 
 +        """Appelé par on_release: root.on_activity(3) dans accelerometer.kv""" 
 +         
 +        r = self.app.get_running_app() 
 +        r.client.send_message(b'/activity', [act]) 
 +           
 +class AccelerometerApp(App): 
 + 
 +    def build(self): 
 +        ... 
 +        self.server = OSCThreadServer() 
 +        self.server.listen( address=b'localhost', 
 +                            port=3003, 
 +                            default=True) 
 +        ...   
 +        self.server.bind(b'/sensor', self.on_sensor) 
 +        self.client = OSCClient(b'localhost', 3001) 
 +         
 +    def on_sensor(self, sens): 
 +        self.sensor = sens.decode('utf-8'
 +</code> 
 + 
 +===service.py=== 
 +[[https://github.com/sergeLabo/accelerometer_service_osc/blob/main/service.py|service.py]] 
 + 
 +<code python> 
 +... 
 +from oscpy.client import OSCClient 
 +from oscpy.server import OSCThreadServer 
 +... 
 +class AccelerometerService: 
 +    ... 
 +    def init_osc(self): 
 +        """Le serveur peut envoyer 
 +        mais impossible d'avoir 2 serveurs sur le même port. 
 +        """ 
 +        self.server = OSCThreadServer() 
 +        self.server.listen('localhost', port=3001, default=True) 
 +        # Les callbacks du serveur 
 +        self.server.bind(b'/activity', self.on_activity) 
 +        self.server.bind(b'/stop', self.on_stop) 
 +        self.server.bind(b'/sensor_enable', self.on_sensor_enable) 
 +        # Un simple client 
 +        self.client = OSCClient(b'localhost', 3003) 
 +    ... 
 +    def on_activity(self, msg): 
 +        print("activity", msg) 
 +        self.activity = int(msg)  
 +    ... 
 +    def get_acceleration(self): 
 +        if self.status: 
 +            a, b, c = 0,0,0 
 +            ... 
 +            if self.sensor_enabled != 0: 
 +                # Set dans les arrays 
 +                self.acc_x[self.num] = a 
 +                self.acc_y[self.num] = b 
 +                self.acc_z[self.num] = c 
 +                self.acc_act[self.num] = self.activity 
 +                acc_message = [a, b, c, self.activity, self.num] 
 +                self.client.send_message(b'/acc', acc_message) 
 +    ...    
 +    def run(self): 
 +        while self.loop: 
 +            self.get_acceleration() 
 +            sleep(0.02) 
 +             
 +if __name__ == '__main__': 
 +    ACCELEROMETER = AccelerometerService() 
 +    ACCELEROMETER.run()             
 +</code> 
 + 
 +=====Bundle===== 
 + 
 +====Envoi d'un bundle avec un Client==== 
 +<code python osc.py> 
 +from oscpy.client import OSCClient 
 + 
 +class OscClient: 
 + 
 +    def __init__(self, **kwargs): 
 + 
 +        self.ip = kwargs.get('ip', None) 
 +        self.port = kwargs.get('port', None) 
 + 
 +        self.client = OSCClient(self.ip, self.port) 
 + 
 +    def send_depth(self, depth): 
 +        self.client.send_message(b'/depth', [depth]) 
 + 
 +    def send_bundle(self, messages): 
 +        bund = [] 
 + 
 +        for i, msg in enumerate(messages): 
 +            tag = ('/' + str(i)).encode('utf-8'
 +            print(tag, msg) 
 +            bund.append([tag, msg]) 
 + 
 +        self.client.send_bundle(bund) 
 + 
 + 
 +if __name__ == "__main__": 
 + 
 +    messages = [[3.2, 3, 4], [55.6, 12, 80]] 
 +    cli = OscClient(**{'ip': '127.0.0.1', 'port': 8003}) 
 +    cli.send_bundle(messages) 
 +</code> 
 + 
 +<code bash> 
 +b'/0' [3.2, 3, 4] 
 +b'/1' [55.6, 12, 80] 
 +</code> 
 +====Réception d'un bundle sur un Server==== 
 +<code python osc_server_test.py> 
 +from time import sleep 
 +from oscpy.server import OSCThreadServer 
 + 
 +dico = {} 
 +def on_tag(*args): 
 +    print(args) 
 +    tag = int(args[0].decode('utf-8')[1:]) 
 +    print(tag) 
 +    dico[tag] = args[1:] 
 +    print(dico) 
 + 
 +def default_handler(*args): 
 +    print("default_handler", args) 
 + 
 +server = OSCThreadServer() 
 +server.listen(b'localhost', port=8003, default=True) 
 +server.default_handler = default_handler 
 + 
 +for i in range(10): 
 +    tag = ('/' + str(i)).encode('utf-8'
 +    server.bind(tag, on_tag, get_address=True) 
 + 
 +while 1: 
 +    sleep(0.1) 
 + 
 +</code> 
 + 
 +<code bash>python3 osc_server_test.py  
 +(b'/0', 3.200000047683716, 3, 4) 
 +
 +{0: (3.200000047683716, 3, 4)} 
 +(b'/1', 55.599998474121094, 12, 80) 
 +
 +{0: (3.200000047683716, 3, 4), 1: (55.599998474121094, 12, 80)} 
 +</code> 
 + 
 +{{tag>kivy osc protocole_reseau python sb}}
kivy_oscpy.1604050910.txt.gz · Dernière modification : 2020/10/30 09:41 de serge