Online Transaction Processing interactions using a Virtuoso DBMS via the Jena RDF Framework

What

A simulation application for demonstrating how to perform transactional read-write operations to a Virtuoso DBMS using the Apache Jena framework.

Why

When performing OLTP (OnLine Transaction Processing) of large amounts of data it is vital to do so using transactions that are consistent with the principles of atomicity, consistency, isolation, and durability(a/k/a ACID) in conjunction with appropriate concurrency modalities (e.g., optimistic or pessimistic). Failure to do so will lead to DBMS integrity compromises and unexpected behaviour in regards to perceived performance and scale.

How

As RDF usage is becoming more mainstream its usage profile is evolving from decision support solely to a mix that now includes OLTP. In response to this reality, we’ve created a Jena program that uses the Virtuoso Native Jena Provider to perform OLTP operations against a Virtuoso instance.

The objective here is to enable developers understand and exercise the spectrum of modalities associated with the ACID characteristics of transaction and concurrency control.

This program has the following characteristics:

  • It’s a multi-threaded application i.e., multiple threads per connection
  • Uses 5 threads per connection by default
  • Each thread inserts 10,000,000 triples
  • The transaction chunk size is 20,000
  • All data is inserted to a named graph denoted by the IRI test:insert
  • At initialization all data associated with test:insert is cleared out
  • The test inserts the following random data groups:

http://www.openlink.com/person_%THREAD_ID%_%INCREMENTED_INT% [ http://www.w3.org/1999/02/22-rdf-syntax-ns#type http://www.openlink.com/person, http://www.openlink.com/lastLocationUpdate “2018-12-01”^^http://www.w3.org/2001/XMLSchema#date , http://www.openlink.com/hasCountry http://www.openlink.com/country_%THREAD_ID% , http://www.openlink.com/hasSkill http://www.openlink.com/skill_A_%RND_INT% , http://www.openlink.com/hasSkill http://www.openlink.com/skill_B_%RND_INT% , http://www.openlink.com/hasSkill http://www.openlink.com/skill_C_%RND_INT% , http://www.openlink.com/hasSkill http://www.openlink.com/skill_D_%RND_INT% , http://www.openlink.com/hasSkill http://www.openlink.com/skill_E_%RND_INT% ].

  • where THREAD_ID
    • ‘A’ - for thread 0
    • ‘B’ - for thread 1
    • ‘C’ - for thread 2
    • ‘D’ - for thread 3 and etc.
    • %RND_INT% - random integer between 1 and 100
    • %INCREMENTED_INT% integer from 0 to max_triples

As a simulator application, the following settings can be changed in the source file “Test_Perf.java” in regards to size of the load operations performed against the instance being tested:

    public static long max_triples = 10000000;         //max insert triples per thread
    public static int max_threads = 5;

    public static int chunk_size = 20000;
    public static String graph_name = "test:insert";

   // vm.setConcurrencyMode(VirtGraph.CONCUR_PESSIMISTIC);
   // vm.setConcurrencyMode(VirtGraph.CONCUR_OPTIMISTIC);

To use:
  • Ensure a suitable Java 8+ runtime is in place
  • Extract the contents of the Jena Bulk Load application archive to a location of your choice
  • From the test_perf directory, edit the mrun.sh script to set the connection attributes ( hostname, port number, username, password) of the target Virtuoso instance
  • Run the mrun.bat (Windows) or mrun.sh (Unix) script to compile and run the application
$ ls -l
total 28
-rw-rw-r-- 1 ubuntu ubuntu 6299 May  3 09:26 Test_Perf.class
-rw-rw-r-- 1 ubuntu ubuntu 8066 May  3 09:20 Test_Perf.java
drwxrwxr-x 2 ubuntu ubuntu 4096 Apr 27 19:36 lib
-rw-rw-r-- 1 ubuntu ubuntu  551 May  2 22:10 mrun.bat
-rwxr-xr-x 1 ubuntu ubuntu  513 Apr 27 21:17 mrun.sh
$

$ cat mrun.sh
export CLASSPATH=./lib/junit-4.5.jar:./lib/jena-arq-3.10.0.jar:./lib/jena-iri-3.10.0.jar:./lib/jena-core-3.10.0.jar:./lib/jena-core-3.10.0-tests.jar:./lib/jena-base-3.10.0.jar:./lib/virtjdbc4.jar:./lib/virt_jena3.jar:./lib/jcl-over-slf4j-1.7.25.jar:./lib/log4j-1.2.17.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-log4j12-1.7.25.jar:./lib/jena-shaded-guava-3.10.0.jar:./lib/libthrift-0.10.0.jar:./lib/httpclient-4.5.5.jar:./lib/commons-compress-1.17.jar:.

javac Test_Perf.java
java Test_Perf localhost 1111 dba dba
$

$ time sh mrun.sh
log4j:WARN No appenders could be found for logger (org.apache.jena.riot.RDFLanguages).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
==[Thread-2]== data prepared 20000
==[Thread-0]== data prepared 20000
==[Thread-4]== data prepared 20000
.
.
.
==[Thread-0]== data inserted 20000
==[Thread-0]== data prepared 20000
==[Thread-0]== data inserted 20000
==[Thread-0] DONE = 10000000

real 7m46.127s
user 3m29.205s
sys 0m5.537s
$

Test_Perf.java source code

import org.apache.jena.graph.NodeFactory;
import org.apache.jena.query.*;
import org.apache.jena.rdf.model.*;
import org.apache.jena.rdf.model.ResourceFactory;
import org.apache.jena.util.iterator.*;
import org.apache.jena.graph.*;
import org.apache.jena.shared.*;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.*;
import java.time.*;
import java.sql.SQLException;
import org.apache.jena.sparql.core.DatasetGraph ;
import virtuoso.jena.driver.VirtGraph;
import virtuoso.jena.driver.VirtModel;

public class Test_Perf extends Thread {

    public static final String VIRTUOSO_INSTANCE = "localhost";
    public static final String VIRTUOSO_PORT = "1111";
    public static final String VIRTUOSO_USERNAME = "dba";
    public static final String VIRTUOSO_PASSWORD = "dba";

    public static long max_triples = 10000000;         // max triples inserted per thread
    public static int max_threads = 5;                       // max threads

    public static int chunk_size = 20000;
    public static String graph_name = "test:insert";

    public static String instance = VIRTUOSO_INSTANCE;
    public static String port = VIRTUOSO_PORT;
    public static String uid = VIRTUOSO_USERNAME;
    public static String pwd = VIRTUOSO_PASSWORD;

    char pid;
    int id;

    public static void log(String mess) {
        System.out.println("   " + mess);
    }

    public static void main(String[] args) {

        if (args.length > 0)
          instance = args[0];

        if (args.length > 1)
          port = args[1];

        if (args.length > 2)
          uid = args[2];

        if (args.length > 3)
          pwd = args[3];

        VirtModel vm = null;
        try {
          vm = VirtModel.openDatabaseModel(graph_name, "jdbc:virtuoso://" + instance + ":" + port, uid, pwd);
          vm.removeAll();
        } catch (Exception e) {
          log(e.toString());
          return;
        } finally {
          if (vm != null)
            try {
              vm.close();
            } catch(Exception e) {}
        }

        Test_Perf[] tests = new Test_Perf[max_threads];

        char pid = 'A';

        for(int i=0; i < max_threads; i++) {
          tests[i] = new Test_Perf((char)(pid+i));
        }

        for(Test_Perf task : tests)
          task.start();

        try {
          for(Test_Perf task : tests) {
            task.join();
          }
        } catch(Exception e) {
          log(e.toString());
        }
    }


    public static int rnd_skill(){
      int max_rnd = 100;
      return (int) (Math.random() * max_rnd); 
    }
    

    public Test_Perf(char _pid) {
      this.pid = _pid;
    }



    public Model genModel() {
        Model m = ModelFactory.createDefaultModel();

        try {
          int i = 0;

          Property np1 = ResourceFactory.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#type");
          Property np2 = ResourceFactory.createProperty("http://www.openlink.com/lastLocationUpdate");
          Property np3 = ResourceFactory.createProperty("http://www.openlink.com/hasCountry");
          Property np4 = ResourceFactory.createProperty("http://www.openlink.com/hasSkill");

          Resource no1 = ResourceFactory.createResource("http://www.openlink.com/person");
          Resource no2 = ResourceFactory.createResource("\""+ LocalDate.now() +"\"^^http://www.w3.org/2001/XMLSchema#date");

          while(i < chunk_size) 
          {
            Resource ns = ResourceFactory.createResource("http://www.openlink.com/person_"+pid+"_"+id);
            id++;

            Resource no3 = ResourceFactory.createResource("http://www.openlink.com/country_"+pid);
            Resource no4 = ResourceFactory.createResource("http://www.openlink.com/skill_A"+rnd_skill());
            Resource no5 = ResourceFactory.createResource("http://www.openlink.com/skill_B"+rnd_skill());
            Resource no6 = ResourceFactory.createResource("http://www.openlink.com/skill_C"+rnd_skill());
            Resource no7 = ResourceFactory.createResource("http://www.openlink.com/skill_D"+rnd_skill());
            Resource no8 = ResourceFactory.createResource("http://www.openlink.com/skill_E"+rnd_skill());

            m.add(ResourceFactory.createStatement(ns, np1, no1)); i++;
            m.add(ResourceFactory.createStatement(ns, np2, no2)); i++;
            m.add(ResourceFactory.createStatement(ns, np3, no3)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no4)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no5)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no6)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no7)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no8)); i++;
          }

        } catch (Exception e) {
            log("==["+Thread.currentThread().getName()+"]***FAILED Test " + e);
            return null;
        }

        return m;
    }



    public void run() {

        VirtModel vm = null;
        long i = 0;

        try {
          vm = VirtModel.openDatabaseModel(graph_name, "jdbc:virtuoso://" + instance + ":" + port, uid, pwd);

          while(i < max_triples) {
            Model m = genModel();
            long sz = m.size();

            i += sz;

            log("==["+Thread.currentThread().getName()+"]== data prepared "+sz);

            while(true) {
              try {
                // vm.setConcurrencyMode(VirtGraph.CONCUR_PESSIMISTIC);
                // vm.setConcurrencyMode(VirtGraph.CONCUR_OPTIMISTIC);
                vm.begin().add(m).commit();
              } catch (Exception e) {
                Throwable ex = e.getCause();
                boolean deadlock = false;
                if ((ex instanceof SQLException) && ((SQLException)ex).getSQLState().equals("40001"))
                  deadlock = true;

                if (deadlock) {
                  log("==["+Thread.currentThread().getName()+"]== deadlock, rollback all and try insert again");
                  vm.abort();
                  continue;
                }

                throw e;
              }

              break;
            }
            log("==["+Thread.currentThread().getName()+"]== data inserted "+sz);
          
          }
        
        } catch (Exception e) {
            log("==["+Thread.currentThread().getName()+"]***FAILED Test " + e);
        } finally {
          if (vm != null)
            try {
              vm.close();
            } catch(Exception e) { }
        }

        log("==["+Thread.currentThread().getName()+"] DONE = "+i);
    }
 }

Git Repository

Note this application has now been added to a central public Github Repository of Virtuoso based Java Sample Applications, which can be cloned and the application run as detailed at Virtuoso_Java_Samples/Jena/Txn_Example

Related

Here’s a newer Jena3 simulation app variant that supports command line options for setting Isolation Levels and Concurrency Modes.

Example output log

$ ./mrun.sh
App options:
 % java Test_Perf_Jena hostname port_num uid pwd isolationMode concurrencyMode

 where 
   isolationMode: read_uncommitted | read_committed | repeatable_read | serializable 
                  default isolationMode = repeatable_read 

   concurrencyMode: default | optimistic | pessimistic 
                  default concurrencyMode = default 

Example of using:
 % java Test_Perf_Jena localhost 1111 dba dba repeatable_read default

===========================================================================

App will use next options
    hostname = localhost
        port = 1111
         UID = dba
         PWD = dba
   isolation = repeatable_read
 concurrency = default
===========================================================================

log4j:WARN No appenders could be found for logger (org.apache.jena.riot.RDFLanguages).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
...

Test_perf_Jena,java source code

import org.apache.jena.graph.NodeFactory;
import org.apache.jena.query.*;
import org.apache.jena.rdf.model.*;
import org.apache.jena.rdf.model.ResourceFactory;
import org.apache.jena.util.iterator.*;
import org.apache.jena.graph.*;
import org.apache.jena.shared.*;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.*;
import java.time.*;
import java.sql.SQLException;
import org.apache.jena.sparql.core.DatasetGraph ;
import virtuoso.jena.driver.VirtGraph;
import virtuoso.jena.driver.VirtModel;
import virtuoso.jena.driver.VirtDataset;
import virtuoso.jena.driver.VirtIsolationLevel;

public class Test_Perf_Jena extends Thread {

    public static final String VIRTUOSO_INSTANCE = "localhost";
    public static final String VIRTUOSO_PORT = "1111";
    public static final String VIRTUOSO_USERNAME = "dba";
    public static final String VIRTUOSO_PASSWORD = "dba";

    public static long max_triples = 10000000;         //max insert triples per thread
    public static int max_threads = 5;

    public static int chunk_size = 20000;
    public static String graph_name = "test:insert";
    public static String s_isolation = "repeatable_read";
    public static String s_concurrency = "default";

    public static String instance = VIRTUOSO_INSTANCE;
    public static String port = VIRTUOSO_PORT;
    public static String uid = VIRTUOSO_USERNAME;
    public static String pwd = VIRTUOSO_PASSWORD;

    public static VirtIsolationLevel isolation = VirtIsolationLevel.REPEATABLE_READ;
    public static int concurrency = VirtGraph.CONCUR_DEFAULT;

    char pid;
    int id;


    public static VirtIsolationLevel getIsolationLevel() {
       String s = s_isolation.toLowerCase();
       if (s.equals("read_uncommitted"))
         return VirtIsolationLevel.READ_UNCOMMITTED;
       else if (s.equals("read_committed"))
         return VirtIsolationLevel.READ_COMMITTED;
       else if (s.equals("repeatable_read"))
         return VirtIsolationLevel.REPEATABLE_READ;
       else if (s.equals("serializable"))
         return VirtIsolationLevel.SERIALIZABLE;
       else {
         s_isolation = "repeatable_read";
         return VirtIsolationLevel.REPEATABLE_READ;
       }
    }

    public static int getConcurrencyMode() {
       String s = s_concurrency.toLowerCase();
       if (s.equals("optimistic"))
         return  VirtGraph.CONCUR_OPTIMISTIC;
       else if (s.equals("pessimistic"))
         return VirtGraph.CONCUR_PESSIMISTIC;
       else {
         s_concurrency = "default";
         return VirtGraph.CONCUR_DEFAULT;
       }
    }


    public static void log(String mess) {
        System.out.println("   " + mess);
    }

    public static void main(String[] args) {

        if (args.length > 0)
          instance = args[0];

        if (args.length > 1)
          port = args[1];

        if (args.length > 2)
          uid = args[2];

        if (args.length > 3)
          pwd = args[3];

        if (args.length > 4)
          s_isolation = args[4];

        if (args.length > 5)
          s_concurrency = args[5];

        System.out.println("App options:");
        System.out.println(" % java Test_Perf_Jena hostname port_num uid pwd isolationMode concurrencyMode\n");
        System.out.println(" where ");
        System.out.println("   isolationMode: read_uncommitted | read_committed | repeatable_read | serializable ");
        System.out.println("                  default isolationMode = repeatable_read \n");
        System.out.println("   concurrencyMode: default | optimistic | pessimistic ");
        System.out.println("                  default concurrencyMode = default \n");

        System.out.println("Example of using:");
        System.out.println(" % java Test_Perf_Jena localhost 1111 dba dba repeatable_read default\n");

        System.out.println("===========================================================================\n");
        System.out.println("App will use next options");
        System.out.println("    hostname = "+instance);
        System.out.println("        port = "+port);
        System.out.println("         UID = "+uid);
        System.out.println("         PWD = "+pwd);
        System.out.println("   isolation = "+s_isolation);
        System.out.println(" concurrency = "+s_concurrency);
        System.out.println("===========================================================================\n");

        VirtDataset vds = null;
        try {
          vds = new VirtDataset("jdbc:virtuoso://" + instance + ":" + port, uid, pwd);
          vds.setIsolationLevel(isolation);
          Model vm = vds.getNamedModel(graph_name);
          vm.removeAll();
        } catch (Exception e) {
          log(e.toString());
          return;
        } finally {
          if (vds != null)
            try {
              vds.close();
            } catch(Exception e) {}
        }

        Test_Perf_Jena[] tests = new Test_Perf_Jena[max_threads];

        char pid = 'A';

        for(int i=0; i < max_threads; i++) {
          tests[i] = new Test_Perf_Jena((char)(pid+i));
        }

        for(Test_Perf_Jena task : tests)
          task.start();

        try {
          for(Test_Perf_Jena task : tests) {
            task.join();
          }
        } catch(Exception e) {
          log(e.toString());
        }

    }


    public static int rnd_skill(){
      int max_rnd = 100;
      return (int) (Math.random() * max_rnd); 
    }
    

    public Test_Perf_Jena(char _pid) {
      this.pid = _pid;
    }



    public Model genModel() {
        Model m = ModelFactory.createDefaultModel();

        try {
/**
http://www.beamery.com/person_A http://www.w3.org/1999/02/22-rdf-syntax-ns#type http://www.beamery.com/person.
http://www.beamery.com/person_A http://www.beamery.com/lastLocationUpdate "2018-12-01"^^http://www.w3.org/2001/XMLSchema#date.
http://www.beamery.com/person_A http://www.beamery.com/hasCountry http://www.beamery.com/country_A.
http://www.beamery.com/person_A http://www.beamery.com/hasSkill http://www.beamery.com/skill_A.
http://www.beamery.com/person_A http://www.beamery.com/hasSkill http://www.beamery.com/skill_B.
http://www.beamery.com/person_A http://www.beamery.com/hasSkill http://www.beamery.com/skill_C.

**/
          int i = 0;

          Property np1 = ResourceFactory.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#type");
          Property np2 = ResourceFactory.createProperty("http://www.beamery.com/lastLocationUpdate");
          Property np3 = ResourceFactory.createProperty("http://www.beamery.com/hasCountry");
          Property np4 = ResourceFactory.createProperty("http://www.beamery.com/hasSkill");

          Resource no1 = ResourceFactory.createResource("http://www.beamery.com/person");
          Resource no2 = ResourceFactory.createResource("\""+ LocalDate.now() +"\"^^http://www.w3.org/2001/XMLSchema#date");

          while(i < chunk_size) 
          {
            Resource ns = ResourceFactory.createResource("http://www.beamery.com/person_"+pid+"_"+id);
            id++;

            Resource no3 = ResourceFactory.createResource("http://www.beamery.com/country_"+pid);
            Resource no4 = ResourceFactory.createResource("http://www.beamery.com/skill_A"+rnd_skill());
            Resource no5 = ResourceFactory.createResource("http://www.beamery.com/skill_B"+rnd_skill());
            Resource no6 = ResourceFactory.createResource("http://www.beamery.com/skill_C"+rnd_skill());
            Resource no7 = ResourceFactory.createResource("http://www.beamery.com/skill_D"+rnd_skill());
            Resource no8 = ResourceFactory.createResource("http://www.beamery.com/skill_E"+rnd_skill());

            m.add(ResourceFactory.createStatement(ns, np1, no1)); i++;
            m.add(ResourceFactory.createStatement(ns, np2, no2)); i++;
            m.add(ResourceFactory.createStatement(ns, np3, no3)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no4)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no5)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no6)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no7)); i++;
            m.add(ResourceFactory.createStatement(ns, np4, no8)); i++;
          }

        } catch (Exception e) {
            log("==["+Thread.currentThread().getName()+"]***FAILED Test " + e);
            return null;
        }

        return m;
    }



    public void run() {

        VirtDataset vds = null;
        long i = 0;

        try {
          vds = new VirtDataset("jdbc:virtuoso://" + instance + ":" + port, uid, pwd);
          vds.setIsolationLevel(isolation);

          VirtModel vm = (VirtModel)vds.getNamedModel(graph_name);
          vm.setConcurrencyMode(concurrency);

          while(i < max_triples) {
            Model m = genModel();
            long sz = m.size();

            i += sz;

            log("==["+Thread.currentThread().getName()+"]== data prepared "+sz);

            while(true) {
              try {
                vm.begin().add(m).commit();
              } catch (Exception e) {
                Throwable ex = e.getCause();
                boolean deadlock = false;
                if ((ex instanceof SQLException) && ((SQLException)ex).getSQLState().equals("40001"))
                  deadlock = true;

                if (deadlock) {
                  log("==["+Thread.currentThread().getName()+"]== deadlock, rollback all and try insert again");
                  vm.abort();
                  continue;
                }

                throw e;
              }

              break;
            }
            log("==["+Thread.currentThread().getName()+"]== data inserted "+sz);
          
          }
        
        } catch (Exception e) {
            log("==["+Thread.currentThread().getName()+"]***FAILED Test " + e);
        } finally {
          if (vds != null)
            try {
              vds.close();
            } catch(Exception e) { }
        }

        log("==["+Thread.currentThread().getName()+"] DONE = "+i);
    }
 }