Java Message Service

Spis rzeczy


1. Pojęcia ogólne

1.1 Przybliżenie pojęcia "Messaging"

"Messaging" jest sposobem komunikacji pomiędzy częściami oprogramowania lub całymi aplikacjami.
Umożliwia rozproszoną komunikację pomiędzy luźno połączonymi elementami (loosely coupled).
Obiekty wysyłający i odbierający komunikat nie muszą być osiągalne w tym samym momencie.
Co więcej nie muszą wiedzieć o sobie czegokolwiek.
Obiekt wysyłający (odbierający) musi wiedzieć dokąd wysłać (skąd odebrać) komunikat (destination) oraz musi znać format komunikatu.

Takie podejście różni się od innych technik komunikacji.
Na przykład Remote Method Invocation (RMI) wymaga wiedzy o metodach udostępnianych przez zdalną aplikację (tightly coupled).
Z kolei format poczty elektronicznej służy do kontaktu pomiędzy ludźmi lub do kontaktu aplikacji z człowiekiem (rzadko odwrotnie).

1.2 Java Message Service API - cóż to jest?

JMS API umożliwia aplikacjom tworzenie, wysyłanie, odbiór i odczyt komunikatów.
Udostępnia zestaw interfejsów ze ściśle określoną semantyką, które umożliwiają programom napisanym w Jawie komunikowanie się z innymi aplikacjami używającymi messagingu.
JMS API oprócz tego, że jest "loosely coupled" to udostępnia dodatkową funkcjonalność w postaci

Pełną specyfikację można uzyskać pod adresem http://java.sun.com/products/jms/.

1.3 Kiedy powinniśmy stosować JMS API?

Jeżeli nasz projekt systemu zakłada jeden z poniższych warunków to powinniśmy się zastanowić nad wykorzystaniem Java Message Service:

Przykładem w którym możliwe jest użycie JMS API jest aplikacja dla wytwórcy samochodów, spełniająca poniższą specyfikację:

Poniższy obrazek ilustruje to jak odbywa się komunikacja w przedsiębiorstwie.
Komunikacja pomiędzy komponentami w przedsiębiorstwie

2. Charakterystyka JMS API

2.1. Architektura JMS API

Aplikacja JMS składa się z następujących części:

Obrazek poniżej ilustruje w jaki sposób poszczególne części współdziałają ze sobą.
architektura JMS API

2.2. Rodzaje komunikacji (Messaging Domains)

Specyfikacja JMS API udostępnia dwa rodzaje komunikacji

2.2.1 point-to-point (PTP)

PTP jest oparta na koncepcji kolejek komunikatów. Klient wysyła komunikat do określonej kolejki a inny klient może odebrac komunikat z tej kolejki.
Kolejka przetrzymuje komunikaty dopóty nie zostaną one odebrane (lub się nie przeterminują).
Każdy komunikat ma co najwyżej jednego odbiorcę.
Klient wysyłający i klient odbierający mogą działać niezależnie; w szczególności komunikat może zostać wysłany nawet wtedy, gdy odbiorca jeszcze nie istnieje.
Odbiorca informuje o prawidłowym pobraniu komunikatu.

Poniższy rysunek odzwierciedla ideę komunikacji PTP
komunikacja point-to-point

2.2.2 publish/subscribe (pub/sub)

W systemach stosujących komunikację (pub/sub), klient wysyła komunikat "na dany temat" (to a topic).
Może istnieć wielu klientów publikujących komunikaty (publishers) oraz odbierających komunikaty (subscribers).
"Subscriber" ma dostęp jedynie do tych komunikatów, które pojawiły się podczas gdy był zarejestrowany u providera (jest jednak możliwość zastosowania trwałej subskrypcji - durable subscription).
Komunikat istnieje tyle ile potrzeba - tzn. jeśli nie ma klienta, który mógłby go odebrać to przestaje być publikowany.

Poniższy rysunek ilustruje zależności w modelu (pub/sub)
komunikacja publish/subscribe

2.3. Sposoby pobierania komunikatów (Message Consumption)

Specyfikacja JMS określa dwa sposoby pobrania komunikatu przez klienta


3. Model programowania przy użyciu JMS API

Części składowe aplikacji JMS

Poniższy rysunek przedstawia miejsce i zależności pomiędzy w.w. obiektami
Model programowania JMS

3.1. Administered objects

"Connection factories" oraz "destinations" są obiektami administrowalnymi (nie programowalnymi).
Jako, że ich interfejs jest ustalony to klient może używac róznych ich implementacji bez obawy o przenośność.
J2EE SDK udostępnia nam narzędzie j2eeadmin, które obsługuje zadania administracyjne.

3.1.1. connection factories

"Connection factory" jest to obiekt służący klientowi do ustanawiania połączeń z providerem. Zawiera parametry konfiguracyjne połączenia zdefiniowane przez administratora.
W J2EE SDK mamy domyślnie skonfigurowaną parę obiektów "connection factory": QueueConnectionFactory oraz TopicConnectionFactory.
Możemy stworzyć nową "fabrykę połączeń" w następujący sposób:

j2eeadmin -addJmsFactory <jndi_name> queue
j2eeadmin -addJmsFactory <jndi_name> topic

Zwykle na początku programu klienckiego poszukujemy odpowiedniej "connection factory". Na przykład poniższy kawałek kodu wyszukuje je po nazwie:

Context ctx = new InitialContext();

QueueConnectionFactory queueConnectionFactory = 
  (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

TopicConnectionFactory topicConnectionFactory = 
  (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");

3.1.2. destinations

"Destination" jest obiektem, którego klient używa w celu określenie miejsca, do którego ma wysyłać komunikaty bądź miejsca z którego ma je pobierać.
Przy pomocy komendy w J2EE SDK możemy utworzyć nasz obiekt "destination".

Dla PTP wygląda to tak:

j2eeadmin -addJmsDestination <queue_name> queue

dla PUB/SUB jest analogicznie:

j2eeadmin -addJmsDestination <topic_name> topic

Odpowiedni "topic" lub "queue" wyszukuje się wiele częściej niż "connection factory". Oto przykład:

Topic myTopic = (Topic) ctx.lookup("MyTopic");
Queue myQueue = (Queue) ctx.lookup("MyQueue");

3.2. Connections

"Connection" jest obiektem zapewniającym połączenie z JMS Providerem (takie wirtualne niskopoziomowe).
Tak jak "connection factory", "connection" ma jedną z dwóch postaci: QueueConnection lub TopicConnection.

QueueConnection queueConnection =
  queueConnectionFactory.createQueueConnection();

TopicConnection topicConnection = 
  topicConnectionFactory.createTopicConnection();

Kiedy aplikacja kończy działanie musimy zamknąć połączenia. Jesli ich nie zamkniemy to JMS Provider nie zwolni odpowiednich zasobów.
Robi się to w następujący sposób

queueConnection.close();

topicConnection.close();

Zamknięcie połączenia powoduje zakończenie pracy dla "Message producers" i "Message consumers".
Jeżeli chcemy tylko na jakiś czas zablokować połączenie używamy metody stop.
(Tutaj warto zaznaczyć, że otwieramy połączenie metodą start).

3.3. Sessions

Sesji używamy do tworzenia "Message producers", "Message consumers" i samych komunikatów.
Tak jak poprzednie obiekty, Sesja może mieć dwa wcielenia - "Topic" i "Queue". Oto przykład tworzenia "Session":

TopicSession topicSession = 
  topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

QueueSession queueSession = 
  queueConnection.createQueueSession(true, 0);

3.4. Message producers

Jest to obiekt stworzony przez obiekt "Session", którego celem jest wysyłanie komunikatów. Oczywiście może to robić na dwa sposoby:

QueueSender queueSender = queueSession.createSender(myQueue);

TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);

Jak widać możemy stworzyć "Sendera" lub "Publishera" i w zależności którą z metod komunikacji chcemy się posłużyć wywołujemy odpowiednią metodę:

queueSender.send(message);

topicPublisher.publish(message);

O tym co to jest "Message" dowiemy się w punkcie 3.6.

3.5. Message consumers

Jest to obiekt stworzony przez obiekt "Session". Odbiera komunikaty wysyłane do "destination".
"Message consumer" umożliwia zarejestrowanie u JMS providera zapotrzebowania na komunikaty z danego "destination".
JMS provider od tej pory będzie dostarczał naszemu "Message consumerowi" komunikaty wysłane do "destination".
Przykład utworzenia "konsumentów" (jak zwykle dwa rodzaje komunikacji):

QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);

TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);

Po utworzeniu "konsumenta" jest on gotowy do odbierania komunikatów (pamiętać należy o wywołaniu metody start - patrz sekcja 3.2. Connections).
Metoda close dla QueueReceiver lub TopicSubscriber dezaktywuje "Message consumera".

Oto przykładowy kod synchronicznego pobrania komunikatu:

queueConnection.start();
Message m = queueReceiver.receive();

topicConnection.start();
Message m = topicSubscriber.receive(1000); // time out after a second

3.5.1. Message listeners

Jest to obiekt służący do asynchronicznej obsługi odbioru komunikatów.
Implementuje interfejs MessageListener, który zawiera jedną metodę - onMessage. W metodzie tej definiujemy akcję jaka ma być wykonana w razie przybycia komunikatu.
Przypuśćmy, że zdefiniowaliśmy klasę TopicListener implementująca powyższy interfejs. Możemy teraz zarejestrować tego "Message listenera":

TopicListener topicListener = new TopicListener();

topicSubscriber.setMessageListener(topicListener);

Dopiero wtedy gdy zarejestrujemy Listenera możemy wywołać metodę start na odpowienim obiekcie typu TopicConnection (lub QueueConnection). W przeciwnym razie możemy stracić część komunikatów.

3.5.2. Message selectors

Zdarza się, że chcemy odrzucić część komunikatów spełniających określone kryterium. Służą do tego "Message selectors".
Każdy selector jest typu String. Składnia jest podzbiorem "SQL92 conditional expression syntax".
Kryteria możemy określać jedynie na podstawie nagłówka komunikatu i jego właściwości (Properties).
Ważnym powodem, dla którego warto używać selektorów jest fakt, że są one przetwarzane po stronie providera, tak więc oszczędzamy na połączeniu.

3.6. Messages

Wreszcie dochodzimy do podmiotu naszych zmagań, czyli komunikatów. Ich struktura jest dosyć oczywista:

Nagłówek zawiera pola, z których większość jest predefiniowana bez udziału klienta (np. JMSMessageID).
Właściwości określają dodatkowe opcje komunikatu (na przykład używamy ich do tworzenia "Message selectors").
Ciało (Message Body) może mieć pięć różnych formatów:

oraz specjalny typ:

który jest używany wtedy, kiedy nie chcemy definiować "Message Body".

Podam przykład realizacji wysyłania i odbierania komunikatu:

Wysyłanie:

TextMessage message = queueSession.createTextMessage();
message.setText(msg_text);     // msg_text is a String
queueSender.send(message);

Odbieranie:

Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
    TextMessage message = (TextMessage) m;
    System.out.println("Reading message: " + message.getText());
} else {
    // Handle error
}