microkitx-ws
v1.0.3
Published
The ws package for ezyli enterprise
Readme
GitHub URL: https://github.com/hartnel/ezyli-async-http
Create a fullWebsocket Agent
Implementation de WebsocketManager
Faire une classe singleton semblable à WebsocketManger.dart
qui a une methode connect prenant :
- wsUrl
- onMessage ou onData
- onError
- onClose
- onOpen
Quand on appelle cette methode, une connexion websocket est initialisé
une méthode send : qui prends un json pour envoyer. il faut faire des checks avant de d’envoyer (par exemple s’il y a une connexion ouverte, … )
Une methode dispose() qui ferme la connexion websocket courante.
voici un exemple en dart.
class WebSocketManager {
// a private constructor
WebSocketManager._();
static final WebSocketManager instance = WebSocketManager._();
WebSocketChannel? _channel; // or IOWebSocketChannel
// connect to the websocket server
WebSocketChannel? connect({
required String wsUrl,
required String accessToken,
required String channelName,
void Function()? onDone,
Function? onError,
required Function(dynamic) onData,
}) {
if (_channel != null) {
// if the channel is already connected, dispose it
dispose();
}
String url = '$wsUrl?token=$accessToken&channel_name=$channelName';
try {
_channel = IOWebSocketChannel.connect(
url,
pingInterval: Duration(seconds: 5),
);
_channel!.stream.listen(
(data) {
onData(data);
},
onDone: onDone,
onError: onError,
);
} catch (_) {}
return _channel;
}
// dispose
void dispose() {
_channel?.sink.close();
_channel = null;
}
}implementer WebSocketSubscription Cette classe décrit la requête d’un composant à écouter le web socket.
C’est une classe dont les instances peuvent être comparables.
Exemple WebSocketSubscription(1,2) = WebSocketSubscription(1,2).
Voici un exemple en Dart.
class WebSocketSubscription extends Equatable {
final String id;
final Function(Map<String, dynamic> data) callback;
final bool Function(Map<String, dynamic> data) shouldNotify;
WebSocketSubscription({
required this.id,
required this.callback,
required this.shouldNotify,
});
@override
List<Object?> get props => [id];
}Implementer WebsocketData
WebsocketData represente une donnée websocket. c’est un couple contenant le message et la date à laquelle elle a été envoyé (data, createdAt).
Voici un exemple en dart.
class WebsocketData extends Equatable {
final DateTime createdAt;
final Map<String, dynamic> data;
WebsocketData({
required this.data,
required this.createdAt,
});
@override
List<Object?> get props => [data, createdAt];
}Implementer WebsocketEvent
L’ensemble des opérations sur les messages websockets sont séquentielles. On peut donc avoir les instructions
-update_messages()
-delete_old_messages() d’ou l’implementation de la classe representant les operations sous la forme.
class WebsocketEvent {
final Function callback;
WebsocketEvent({
required this.callback,
});
}Implementer WebsockedEventLoop
Cette classe est responsable de l’exécution des opérations séquentielle sur les messages websockets.
Il empile donc les opérations dans la pile et les exécute les unes après les autres.
Voici une implémentation en Dart.
class _WebsockedEventLoop {
final _queue = Queue<WebsocketEvent>();
var _running = false;
void add(WebsocketEvent evt) {
_queue.add(evt);
if (!_running) {
_running = true;
_run();
}
}
void _run() {
while (_queue.isNotEmpty) {
WebsocketEvent event = _queue.removeFirst();
try {
event.callback();
} catch (e) {
log("Error occurred while executing callback: $e");
}
}
_running = false;
}
}Implementer WebSocketHandler
Cette classe est singleton et est la principale qui est chargé d'interagir avec le reste de votre application. Elle a donc des attributs et des méthodes specifiques.
les attributs
- la liste des subscriptions (les callabacks d'ecoute).
final List<WebSocketSubscription> _subscriptions = [];- la liste des messages websockets empilés.
final List<WebsocketData> _websocketDataList = [];- Le gestionnaire de l'execution sequentielle.
var eventLoop = _WebsockedEventLoop();- une callback (avec un setter) qui permet de déverser tout les messages brut websocket vers votre app, peut être pour une utilisation ulterieur comme pour les logger par exemple. il faut noter que l'utilisation de cette callback reviens à definir une subscription avec un filtre qui laisse tout passer.
Function(Map<String, dynamic> data)? onNewData;- une callback (avec un setters) qui est appelé quand l'etat du websocket change.
enum WebsocketState {
connecting,
connected,
disconnected,
error,
}
Function(WebsocketState state) onWsStatusChanged = (state) {};les méthodes
- Une methode listen qui permet à l'application de demarrer l'écoute du websocket.
listenForWebsocketEvents({
required url,
}) {}Elle est implementé suivant ces lignes de conduites :
-- verifier les conditions de bases : on ne peut pas demarrer l'ecoute pendant qu'une autre ecoute est en cour par exemple. ...
// log("[EVENT_LOG] init websocket ...");
// if not disconnected, then return
if (_websocketState != WebsocketState.disconnected) {
return;
}
-- correctement manager l'etat du websocket.
_websocketState = WebsocketState.connecting;-- une fois le websocket connecté,il faut demarer le hearbit (une fonction qui s'execute de facon periodique pour notifier le backend de la presence dans sur le reseau) ;
-- une fois qu'une donnée arrive, on appele onNewData et on declenche la notification des dependances
onNewData != null ? onNewData!(data) : null;
//notifyListeners
notifyListeners(
data: data,
createdAt: DateTime.now(),
);-- En cas d'ereur ou de fermeture de websocket, lancer la méthode de gestion de fermeture handleClose dans cet exemple, handleClose est plutot handleDone
onError: (error) {
log("[EVENT_LOG] onError error : $error");
handleDone(url, accessToken, channelName);
},
onDone: () {
handleDone(url, accessToken, channelName);
},Voici un exemple de cette fonction implementé en dart.
NB : Il peut y avoir des differences avec votre language
listenForWebsocketEvents({
required url,
required String accessToken,
required String channelName,
}) {
//set should reconnect to true
shouldReconnect = true;
//if empty accessToken or channelName, return
if (accessToken.isEmpty || channelName.isEmpty) {
return;
}
log("[EVENT_LOG] listenForWebsocketEvents :: Connecting : $channelName ... \n");
if (channel != null) {
// if the channel is already connected, dispose it
dispose();
}
_accessToken = accessToken;
_channelName = channelName;
_url = url;
// log("[EVENT_LOG] init websocket ...");
// if not disconnected, then return
if (_websocketState != WebsocketState.disconnected) {
return;
}
_websocketState = WebsocketState.connecting;
//log channelName
// log("[EVENT_LOG] Initializing _listenForWebsocketEvents : $channelName ... \n");
channel = WebSocketManager.instance.connect(
wsUrl: url,
accessToken: accessToken,
channelName: channelName,
onData: (res) {
// log("[EVENT_LOG] data : $res");
try {
final json = jsonDecode(res);
if (json is Map && json.containsKey('data')) {
Map<String, dynamic> data = json['data'];
String? event = data['event'];
bool isConnectionEvent = event == 'connected';
log("[EVENT_LOG] event : $event", name: "WebSocketHandler");
if (isConnectionEvent && channel != null) {
//register heart beat
registerHeartBeatFn?.call(sendHeartBeat);
//startHearBeat(Duration(seconds: 10));
_websocketState = WebsocketState.connected;
onWsStatusChanged(_websocketState);
}
// handleNotification(data, fromFcm: false);
onNewData != null ? onNewData!(data) : null;
//notifyListeners
notifyListeners(
data: data,
createdAt: DateTime.now(),
);
}
} catch (e) {
log("[EVENT_LOG] onData error : $e");
}
},
onError: (error) {
log("[EVENT_LOG] onError error : $error");
handleDone(url, accessToken, channelName);
},
onDone: () {
handleDone(url, accessToken, channelName);
},
);
if (channel == null) {
log("[EVENT_LOG] listenForWebsocketEvents :: channel is null");
handleDone(url, accessToken, channelName);
}
}- Une methode subscribe qui permet à une partie de votre application de souscrire à un type d'evenements websockets
il faut ici s'assurer de notifier les dependants quand un composant souscrit aux évènements. car peut être certains evenements le concernant sont arrivés avant qu'il ne subscribe. voici un exemple d'execution python
void subscribe(WebSocketSubscription sub) {
WebsocketEvent evt = WebsocketEvent(
callback: () {
log("[subscribe] ...");
if (!_subscriptions.contains(sub)) {
_subscriptions.add(sub);
}
},
);
eventLoop.add(evt);
// Here we are going to create a fake data to notifylisteners.
// This is to make sure that if its data already arrived before the subscription, it will be notified
// The fake data will be created 5 minutes ago so it is consumed immediately (considered expired)
DateTime now = DateTime.now();
DateTime timeToCheck = now.subtract(Duration(minutes: 5));
notifyListeners(data: {}, createdAt: timeToCheck);
}- La methode de unsubscribe. Voici un exemple en dart :
void unsubscribe(WebSocketSubscription sub) {
WebsocketEvent evt = WebsocketEvent(
callback: () {
log("[unsubscribe] ...");
if (_subscriptions.contains(sub)) {
_subscriptions.remove(sub);
}
},
);
eventLoop.add(evt);
}-- cancelSubscriptionById et cancelAllSubscriptions
void cancelSubscriptionById(String id) {
WebsocketEvent evt = WebsocketEvent(
callback: () {
log("[cancelSubscriptionById] ...");
_subscriptions.removeWhere((sub) => sub.id == id);
},
);
eventLoop.add(evt);
}
void cancelAllSubscriptions() {
// _subscriptions.clear();
WebsocketEvent evt = WebsocketEvent(
callback: () {
log("[cancelAllSubscriptions] ...");
_subscriptions.clear();
},
);
eventLoop.add(evt);
}-- une methode dispose qui ferme tout le necessaire
// dispose
void dispose() {
channel?.sink.close();
channel = null;
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
//remove heartbeat
unregisterHeartBeatFn?.call();
}
-- une methode qui coupe juste la connexion websocket
//void dispose and not reconnect
void disposeAndNotReconnect() {
shouldReconnect = false;
dispose();
}-- Une methode permetant d'envoyer le heartbit coté serveur. il s'agit juste d'une donnée
{'type': 'heartbeat', 'data': 'ping'}-- une methode pour notifier les dependances
il s'agit de declarer une instruction et l'envoyer à l'event loop
cette instruction consiste à
- checker tout les dependances et notifier si necessaire
- retirer tout les evenements qui ont deja été consomés
- retirer les evenements expirés
voici un exemple de cette fonction en dart
void notifyListeners({
required Map<String, dynamic> data,
required DateTime createdAt,
}) {
WebsocketEvent evt = WebsocketEvent(
callback: () {
//add to the list
if (data.isNotEmpty) {
_websocketDataList
.add(WebsocketData(data: data, createdAt: createdAt));
}
Set<WebsocketData> consumed = {};
// Notify listeners. for each entry that is consumed, remove the data from the list
for (var sub in _subscriptions) {
for (var elt in _websocketDataList) {
if (sub.shouldNotify(elt.data)) {
sub.callback(elt.data);
consumed.add(elt);
}
}
}
//detect expired
for (var elt in _websocketDataList) {
DateTime now = DateTime.now();
bool hasExpired =
now.difference(elt.createdAt).inSeconds > timeToLive.inSeconds;
if (hasExpired) {
consumed.add(elt);
}
}
// remove the consumed data
for (var data in consumed) {
_websocketDataList.remove(data);
}
},
);
eventLoop.add(evt);
}-- la methode qui gere la fermeture du websocket.
handleClose() qui va juste essayer de se reconnecter.
webSocketReconnectAttempts++;
listenForWebsocketEvents(
accessToken: accessToken,
channelName: channelName,
url: url,
);voici un exemple complet en dart.
void handleDone(String url, String accessToken, String channelName) {
log("[EVENT_LOG] listenForWebsocketEvents :: Disconnected error : $webSocketReconnectAttempts");
_websocketState = WebsocketState.disconnected;
// reconnect after delay
if (shouldReconnect) {
//set internet connection disturbing
onWsStatusChanged(_websocketState);
Future.delayed(Duration(seconds: delay), () {
webSocketReconnectAttempts++;
listenForWebsocketEvents(
accessToken: accessToken,
channelName: channelName,
url: url,
);
});
webSocketReconnectAttempts++;
} else {
webSocketReconnectAttempts = 0;
//unregister heartbeat
unregisterHeartBeatFn?.call();
_isSendingHeartBeat = false;
}
}- Handle async clients
Ici, nous supposons que vous avez deja un client http surchargé et assez complet pour prendre en compte les parametres tel que : onError, onTimeOut, requestTimeout, onSuccess
- Il faut ecrire une classe
AsyncReqConfigrepresentant les parametres d'execution d'une requête asynchrone. Voici l'exemple d'execution en dart.
class AsyncReqConfig {
//the callback that will be called to notify the man req maker that request is finish
bool Function({required Map<String, dynamic> Wsdata}) shouldNotifyfn;
// after request finish, do we need to execute a callback to retrieve same date and back to the view model ?
Future<RepoResponse> Function(Map<String, dynamic> data) callback;
//fn verify if it's verbose event
bool Function({required Map<String, dynamic> Wsdata})? isVerboseEventFn;
//verbose callback
void Function(Map<String, dynamic> data)? verboseCallback;
// original request
String requestId;
//onTimeOut
Function(RequestError error)? onTimeOut;
//requestTimeout
Duration requestTimeout;
//constructors with check
AsyncReqConfig({
required this.shouldNotifyfn,
required this.callback,
required this.requestId,
this.isVerboseEventFn,
this.verboseCallback,
this.onTimeOut,
this.requestTimeout = const Duration(seconds: 30),
}) {}
}-- requestTimeout : est le temps max que l'application met pour attendre une requête websocket
-- shouldNotifyfn : c'est une fonction retournant un bool, ca permet de definir la logique de detection de la reception de la reponse par websocket (la fin de la requête)
-- callback : il s'agit de la fonction qui est executé lorsque shouldNotifyfn a été positif c'est à dire une fois que la requête est terminée.
-- requestId : il s'agit de l'id de la requête.
-- isVerboseEventFn : il s'agit de la fonction qui permet de detecter si un evenement relatif à la requête est juste une information (tel que la progression d'une de l'execution par exemple).
-- verboseCallback : il s'agit de la fonction qui permet de dire quoi faire quand une notification par rapport à une requête arrive (par exemple appeler une callback ? ... notifier mon interface, ...)
-- onTimeOut : qu'est ce qu'on fait si on a pas pu recevoir la reponse par ws ?
- Ecrire la classe principale
AsyncHttpRepo
