Messaging

Übersicht

Messaging ist eine zentrale Technologie für die Kommunikation von Anwendungen untereinander. Dabei wird eine lose Kopplung über sogenannte Messages etabliert, die über einen Message-Provider verteilt werden. Wichtig hierbei ist eine garantierte und nachvollziehbare Übermittlung der Messages. Als Beispiel für Message-Provider dienen die etablierten Applikationsserver der JEE oder separate Produktlösungen wie Apache ActiveMQ/Artemis.

Im Rahmen einer Service-orientierten Architektur und einer Umsetzung beispielsweise mit Microservices steigt die Anzahl der zu übermittelnden Nachrichten sehr schnell an, damit die Services sich untereinander synchronisieren können. Auch hier ist ein möglichst flexibler Ansatz notwendig, um den Konfigurations- und Administrationsaufwand beherrschen zu können. Nachdem Microservices auf Grund der dezentralen Datenhaltung im Normalfall auf einer BASE-Architektur beruhen ist hier die Garantie des Message-Transports zwar immer noch gewünscht, allerdings liegt der Schwerpunkt hier mehr auf einer horizontal skalierbaren Lösung, die eine immense Menge von Messages verteilen kann. Hier hat sich insbesondere in der Java-Welt das auf Streaming basierende Apache Kafka etabliert. 

Producer und Consumer

Um die Message-basierte Kommunikation zu veranschaulichen dient das folgende Bild aus der ActiveMQ-Dokumentation.

Producer und Consumer

Diese Bild ist leicht zu verstehen: Producer senden Messages an eine Queue, die dann die eingetroffenen Nachrichten an die registrierten Consumers verteilt.

Im klassischen Messaging werden hierzu zwei unterschiedliche Strategien unterstützt:

  • Beim Point-to-Point-Messaging wird die Nachricht an exakt einen Consumer übermittelt.
  • Bei Publish-Subscribe werden alle Consumer die Message erhalten. 

Bei Active MQ werden die Messages garantiert in einem Message Buffer auf Platte gespeichert und sind damit auch bei einem Neustart verfügbar. Weiterhin ist die Verweildauer der Message in der Queue prinzipiell beliebig lange möglich, so dass die Queue Lastspitzen ausgleichen kann bzw. bei ausgefallenen Consumern auch als Zwischenpuffer fungieren kann. 

Streaming

Streaming funktioniert etwas anders: Hier werden die Nachrichten als (potenziell unendlich langer) Stream von Eingangsdaten aufgefasst, die von einem Stream Processor transformiert und damit verarbeitet werden. Dieses Konzept wird von Apache Kafka umgesetzt:

Streaming mit Apache Kafka

Wie in obigem Bild, das der Apache Kafka Webseite entnommen ist, erkennbar ist unterstützt Kafka zusätzlich noch das etablierte Producer/Consumer-Konzept, ist aber noch zusätzlich in der Lage, über Connectors Datenbank-Systeme anzubinden. 

Durchsatz und Clusterbetrieb

ActiveMQ und Kafka sind beides Systeme, die auf hohen Nachrichtendurchsatz ausgerichtet sind. Wobei tausende von Nachrichten pro Sekunde für beide System noch nicht wirklich viel sind. 

Trotzdem lohnt es sich, die beiden Systeme im Hinblick auf den Durchsatz und die Skalierbarkeit im Cluster-Betrieb zu vergleichen.

ActiveMQ verliert durch die Art der Nachrichtenpersistierung im Vergleich zu Apache Kafka deutlich. Dies liegt an der Art der Ablage der Messages, die bei ActiveMQ viel Wert auf eine sichere Ablage in einem, wahrscheinlich Datei-basierten, Message-Store legt. Der Aufbau eines ActiveMQ-Clusters verlangt eine relativ komplexe Systemarchitektur: Jeder einzelne Knoten benötigt eine Active/Passive-Instanz und die Skalierung des Systems verlangt eine intensive und aufwändige Kommunikation der Knoten untereinander.

Apache Kafka ist im Gegensatz hierzu recht einfach: Der Cluster skaliert horizontal und unterstützt ein automatisches Scale Up/Down. Die Ausfallsicherheit erreicht der Cluster durch eine interne Replikation der Nachrichten an mehrere Knoten. 

Damit ist der Durchsatz und die Skalierbarkeit eines Kafka-Clusters wesentlich besser als ein äquivalentes ActiveMQ-System. Allerdings ist hierbei zu beachten, dass eine alleinige Betrachtung dieser beiden Features bei der Auswahl eines Produkts nicht ausreicht: Ein klassisches Messaging-System bietet mit Queues, Selectors, den verschiedenen Acknowledge-Modi, Redelivery und Dead Letter Queues viele interessante zusätzliche Dienste an, muss für deren Umsetzung dann aber auch mehr Aufwand betreiben.


Seminare zum Thema

Weiterlesen

Spring Data Couchbase

Mit Spring Data Couchbase kann eine Java-Applikation sehr einfach auf die Dokumente des Couchbase Servers zugreifen. 

Das im Folgenden erläuterte Beispiel ist unter https://github.com/Javacream/org.javacream.training.couchbase zu finden.

Das POM

Couchbase wird von der Spring Boot Community unterstützt und durch einen eigenen Starter realisiert:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>
</dependency>

Java-Document-Mapping

Die Struktur einer Java-Klasse wird über Couchbase-Annotationen in ein Dokument gemapped. Die folgende Klasse entspricht einer Airline aus dem travel-demo der Couchbase-Installation:

package org.javacream.training.couchbase.spring.data.travel;

import org.springframework.data.couchbase.core.mapping.Document;

import com.couchbase.client.java.repository.annotation.Field;
import com.couchbase.client.java.repository.annotation.Id;

@Document
public class Airline {

	@Id
	private String id;

	@Field
	private String type;

	@Field
	private String name;

	@Field("iata")
	private String iataCode;

	
	@Field
	private String icao;

	@Field
	private String callsign;

	@Field
	private String country;

        //getter und setter
}

Das Repository

Das Repository wird wie bei Spring Data üblich als Interface realisiert:

package org.javacream.training.couchbase.spring.data.travel;

import java.util.List;

import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
import org.springframework.data.couchbase.core.query.View;
import org.springframework.data.couchbase.core.query.ViewIndexed;
import org.springframework.data.repository.CrudRepository;

/**
 * Repository interface to manage {@link Airline} instances.
 *
 */
@N1qlPrimaryIndexed
@ViewIndexed(designDoc = "airlines")
public interface TravelRepository extends CrudRepository<Airline, String> {

	/**
	 * Derived query selecting by {@code iataCode}.
	 *
	 * @param code
	 * @return
	 */
	Airline findAirlineByIataCode(String code);

	/**
	 * Query method using {@code airlines/all} view.
	 *
	 * @return
	 */
	@View(designDocument = "airlines", viewName = "allAirlines")
	List<Airline> findAllAirlinesBy();
}

Die Anwendung

Die Spring Boot-Applikation wird über die application.properties konfiguriert:


spring.couchbase.bucket.name=travel-sample
spring.couchbase.bootstrap-hosts=localhost
spring.couchbase.username=*****
spring.couchbase.password=*****

Die Anwendung benutzt dann das Repository, um mit der Couchbase zu kommunizieren.


Seminare zum Thema

Weiterlesen

Installation des Couchbase Servers

Die Installation des Couchbase Servers, eines Dokumenten-orientierten Datenbanksystems, ist durch die Verwendung eines Docker-Images einfach durchzuführen. 

Dazu wird das zugehörige offizielle Image vom Docker Hub geladen:

docker pull couchbase

Als Datenbank-Server muss der daraus erzeugte Container natürlich durch Port-Mapping den Zugriff auf die internen Server-Endpoints gewährleisten. Sollen die Daten einen Upgrade des Containers überstehen, was in den allermeisten Fällen gewünscht ist, so wird ein Volume oder einen Mount verwendet:

docker run -d -p 8091-8093:8091-8093 -p 11210:11210 --name db -v couchbase_volume:/opt/couchbase/var couchbase

Beim ersten Aufruf der Web Konsole über 

http://localhost:8091

kann die Installation angeschlossen werden. Hierbei werden der Clustername sowie der Administrator-Account angelegt. Der Couchbase Server ist damit einsatzbereit.

Die Couchbase Web Console

 


Seminare zum Thema

Weiterlesen

Dokumenten-orientierte Datenbanksysteme

Dokumenten-orientierte Datenbanksysteme sind im Rahmen der  NoSQL-Bewegung entstanden und haben sich in den letzten Jahren zu stabilen und etablierten Produkten entwickelt. Im aktuellen Ranking von Datenbanksystemen steht mit der kommerziellen MongoDB ein Document-Store unter den Top 5, aber auch die in einer vollwertigen Community-Edition vorliegende Couchbase ist, wenn auch deutlich schwächer, vertreten.  

Was sind Dokumente?

Ein Dokument ist eine Datenstruktur, die ähnlich wie eine Datenbank-Tabelle einem definierten Schema genügt. Allerdings wird dieses Schema von der Datenbank in der Regel nicht über Constraints beim Schreiben des Datensatzes geprüft (“Schema on write”), sondern erst bei Abfragen (“Schema on read”):  Dokumente, die der Query entsprechen, werden in die Treffermenge aufgenommen, nicht-passende Dokumente eben nicht. Eine Dokumenten-orientierte Datenbank benötigt deshalb nicht unbedingt verschiedene Tabellen-Definitionen, sondern kann alle Dokumente in einer einzigen Collection oder einem “Bucket” ablegen.

So können beispielsweise in einer Datenbank sämtliche Dokumente einer Reiseagentur (Fluglinien, Flughäfen, aber auch Flugpläne und Routen) gemeinsam abgelegt werden.

Dokumente werden über eine innerhalb der Datenbank eindeutigen Document-ID identifiziert.

Links versus Joins

Im Gegensatz zu einem relationalen Modell unterstützen Dokumente Server-seitige Joins nicht unbedingt. Es ist eher üblich, Dokumente zu Verlinken und damit im Endeffekt dem Client das Nachladen von Assoziationen zu überlassen. 

Dokumenten-Formate

Als Quasi-Standard für das Format von Dokumenten hat sich JSON herauskristallisiert. Dies ist einesteils etwas überraschend, da JSON bis heute keinen wirklichen Standard für Links definiert hat. Hier ist XML klar überlegen. Andererseits werden Dokumente sehr häufig im Rahmen einer RESTful Architektur benutzt, so dass in der Praxis als Implementierung http genutzt wird. Und dafür ist JSON die natürliche Wahl.

Clusterbetrieb

Dokumenten-orientierte Datenbanksysteme sind immer auf einen Cluster-Betrieb ausgerichtet. Das ergibt sich klar aus dem Bezug zur NoSQL-Bewegung und damit dem “Big Data”-Umfeld. 

Für die Umsetzung eines dynamisch skalierenden Clusters bieten sich zwei Strategien an:

  • Sharding: Hier werden die Dokumente auf Grund eines Sharding Keys, der nicht unbedingt der Document-ID entsprechen muss, auf die verschiedene Knoten des Clusters verteilt. Ein zentraler Master oder Router nimmt alle Anfragen entgegen und verteilt diese dann an Hand von Konfigurations-Informationen auf die Knoten, die die Daten enthalten. Die MongoDB ist ein Beispiel für diese Cluster-Architektur. Die folgende Abbildung entstammt der Dokumentation unter https://docs.mongodb.com/manual/core/sharded-cluster-components/:
    Ein einfacher Mongo Cluster
     
  • Ring Cluster: Hier wird auf den Master und den Konfigurationsserver verzichtet; jeder Knoten des Clusters ist gleichberechtigt. Ein Beispiel hierfür ist der Couchbase Server, siehe https://docs.couchbase.com/server/5.0/architecture/architecture-intro.html:
    Couchbase Cluster

 


Seminare zum Thema

Weiterlesen

Apache Ignite: Programmierung

Der Zugriff auf das Apache Ignite-Grid kann über verschiedene Protokolle erfolgen. Dazu werden Treiber-Bibliotheken für verschiedene Programmiersprachen angeboten. Daneben ist auch ein REST-API vorgesehen.

Nachdem Ignite selbst in Java realisiert ist bietet es sich an, auch für  Client-Anwendungen die Java-Plattform zu benutzen.

Direkter Zugriff auf einen Cache

Das Ansprechen eines Caches ist aus Sicht eines Programmierers nichts anderes als ein Map-Zugriff. Folglich ist das folgende Programm, das den Beispielprogrammen der Ignite-Community auf GitHub entnommen wurde, sehr trivial:

public class ClientPutGetExample {
    public static void main(String[] args) {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
        try (IgniteClient igniteClient = Ignition.startClient(cfg)) {
            final String CACHE_NAME = "put-get-example";
            ClientCache cache = igniteClient.getOrCreateCache(CACHE_NAME);
            Integer key = 1;
            Address val = new Address("1545 Jackson Street", 94612);
            cache.put(key, val);
            Address cachedVal = cache.get(key);
            System.out.println("Found address: " + cachedVal);
        }
        catch (ClientException e) {
            System.err.println(e.getMessage());
        }
        catch (Exception e) {
            System.err.format("Unexpected failure: %s\n", e);
        }
    }
}

Das SQL-Grid

Obwohl Ignite ein Key-Value-Store ist werden auch SQL-Abfragen unterstützt. Für die Java-Programmierung stellt sich Ignite damit als relationales Datenbank-System dar, das über einen JDBC-Treiber angesprochen wird:

 
public class SqlJdbcExample {
    public static void main(String[] args) throws Exception {
        try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
            try (Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE city (id LONG PRIMARY KEY, name VARCHAR) " +
                    "WITH \"template=replicated\"");

                stmt.executeUpdate("CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, " +
                    "PRIMARY KEY (id, city_id)) WITH \"backups=1, affinity_key=city_id\"");
                stmt.executeUpdate("CREATE INDEX on Person (city_id)");
            }
            try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO city (id, name) VALUES (?, ?)")) {
                stmt.setLong(1, 1L);
                stmt.setString(2, "Forest Hill");
                stmt.executeUpdate();
                stmt.setLong(1, 2L);
                stmt.setString(2, "Denver");
                stmt.executeUpdate();
                stmt.setLong(1, 3L);
                stmt.setString(2, "St. Petersburg");
                stmt.executeUpdate();
            }
            // Populate Person table with PreparedStatement.
            try (PreparedStatement stmt =
                conn.prepareStatement("INSERT INTO person (id, name, city_id) values (?, ?, ?)")) {
                stmt.setLong(1, 1L);
                stmt.setString(2, "John Doe");
                stmt.setLong(3, 3L);
                stmt.executeUpdate();
                stmt.setLong(1, 2L);
                stmt.setString(2, "Jane Roe");
                stmt.setLong(3, 2L);
                stmt.executeUpdate();
                stmt.setLong(1, 3L);
                stmt.setString(2, "Mary Major");
                stmt.setLong(3, 1L);
                stmt.executeUpdate();
                stmt.setLong(1, 4L);
                stmt.setString(2, "Richard Miles");
                stmt.setLong(3, 2L);
                stmt.executeUpdate();
            }
            try (Statement stmt = conn.createStatement()) {
                try (ResultSet rs =
                    stmt.executeQuery("SELECT p.name, c.name FROM Person p INNER JOIN City c on c.id = p.city_id")) {
                    while (rs.next())
                        System.out.println(rs.getString(1) + ", " + rs.getString(2));
                }
            }
            try (Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("DROP TABLE Person");
                stmt.executeUpdate("DROP TABLE City");
            }
        }
    }
}

Zu beachten ist, dass die Ablage der Daten immer noch in einem Key-Value-Store erfolgt: Die “Tabellen” sind damit Caches!

Weitere Grids

Ignite bietet noch weitere Grids an:

  • Das Compute-Grid ermöglicht die Ausführung einer beliebigen Programm-Sequenz auf den Knoten des Ignite-Clusters. Dazu überträgt der Java-Client beispielsweise eine Lambda-Funktion oder ein Runnable-Objekt. Typischerweise sind diese Programme als Map-Reduce-Funktionen ausgebildet, die auf den in den Caches gespeicherten Daten operieren.
  • Mit dem Service-Grid werden auf den Cluster Services installiert.  Damit wird Ignite faktisch zu einem Applikationsserver.
  • Eine Sonderform des Compute-Grids ist Machine Learning: Die dazu benötigten Algorithmen werden als Bestandteil der Ignite-Distribution fertig implementiert zur Verfügung gestellt. 

Seminare zum Thema

Weiterlesen