PublicRepository.java

  1. /*
  2.  * Copyright (c) 2007-2017 MetaSolutions AB
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */

  16. package org.entrystore.impl;

  17. import com.github.benmanes.caffeine.cache.Cache;
  18. import com.github.benmanes.caffeine.cache.Caffeine;
  19. import com.google.common.collect.Queues;
  20. import org.eclipse.rdf4j.model.IRI;
  21. import org.eclipse.rdf4j.model.Model;
  22. import org.eclipse.rdf4j.model.Resource;
  23. import org.eclipse.rdf4j.model.Value;
  24. import org.eclipse.rdf4j.model.ValueFactory;
  25. import org.eclipse.rdf4j.repository.Repository;
  26. import org.eclipse.rdf4j.repository.RepositoryConnection;
  27. import org.eclipse.rdf4j.repository.RepositoryException;
  28. import org.eclipse.rdf4j.repository.sail.SailRepository;
  29. import org.eclipse.rdf4j.sail.memory.MemoryStore;
  30. import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
  31. import org.entrystore.AuthorizationException;
  32. import org.entrystore.Context;
  33. import org.entrystore.ContextManager;
  34. import org.entrystore.Entry;
  35. import org.entrystore.EntryType;
  36. import org.entrystore.GraphType;
  37. import org.entrystore.PrincipalManager;
  38. import org.entrystore.config.Config;
  39. import org.entrystore.repository.RepositoryManager;
  40. import org.entrystore.repository.config.Settings;
  41. import org.slf4j.Logger;
  42. import org.slf4j.LoggerFactory;

  43. import java.io.File;
  44. import java.net.URI;
  45. import java.util.Date;
  46. import java.util.HashSet;
  47. import java.util.Iterator;
  48. import java.util.Queue;
  49. import java.util.Set;
  50. import java.util.concurrent.ConcurrentMap;

  51. /**
  52.  * @author Hannes Ebner
  53.  */
  54. public class PublicRepository {
  55.    
  56.     Logger log = LoggerFactory.getLogger(PublicRepository.class);
  57.    
  58.     private boolean rebuilding = false;
  59.    
  60.     private Repository repository;
  61.    
  62.     private RepositoryManager rm;
  63.    
  64.     private PrincipalManager pm;

  65.     private Thread entrySubmitter;

  66.     private final Cache<URI, Entry> postQueue = Caffeine.newBuilder().build();

  67.     private final Queue<Entry> deleteQueue = Queues.newConcurrentLinkedQueue();

  68.     private static final int BATCH_SIZE = 1000;

  69.     public class EntrySubmitter extends Thread {

  70.         @Override
  71.         public void run() {
  72.             while (!interrupted()) {
  73.                 postQueue.cleanUp();
  74.                 int batchCount = 0;

  75.                 if (postQueue.estimatedSize() > 0 || !deleteQueue.isEmpty()) {
  76.                     if (!deleteQueue.isEmpty()) {
  77.                         Set<Entry> entriesToRemove = new HashSet<>();
  78.                         synchronized (deleteQueue) {
  79.                             while (batchCount < BATCH_SIZE) {
  80.                                 Entry e = deleteQueue.poll();
  81.                                 if (e == null) {
  82.                                     break;
  83.                                 }
  84.                                 entriesToRemove.add(e);
  85.                                 batchCount++;
  86.                             }
  87.                         }
  88.                         if (batchCount > 0) {
  89.                             log.info("Removing " + batchCount + " entries from Public Repository, " + deleteQueue.size() + " entries remaining in removal queue");
  90.                             removeEntries(entriesToRemove);
  91.                         }
  92.                     }
  93.                     if (postQueue.estimatedSize() > 0) {
  94.                         Set<Entry> entriesToUpdate = new HashSet<>();
  95.                         synchronized (postQueue) {
  96.                             ConcurrentMap<URI, Entry> postQueueMap = postQueue.asMap();
  97.                             Iterator<URI> it = postQueueMap.keySet().iterator();
  98.                             while (batchCount < BATCH_SIZE && it.hasNext()) {
  99.                                 URI key = it.next();
  100.                                 Entry entry = postQueueMap.get(key);
  101.                                 postQueueMap.remove(key, entry);
  102.                                 if (entry == null) {
  103.                                     log.warn("Value for key " + key + " is null in Public Repository submit queue");
  104.                                 }
  105.                                 entriesToUpdate.add(entry);
  106.                                 batchCount++;
  107.                             }
  108.                         }
  109.                         postQueue.cleanUp();
  110.                         log.info("Sending " + entriesToUpdate.size() + " entries for update in Public Repository, " + postQueue.estimatedSize() + " entries remaining in post queue");
  111.                         updateEntries(entriesToUpdate);
  112.                     }
  113.                 } else {
  114.                     try {
  115.                         Thread.sleep(10000);
  116.                     } catch (InterruptedException ie) {
  117.                         log.info("Public Repository submitter got interrupted, shutting down submitter thread");
  118.                         return;
  119.                     }
  120.                 }
  121.             }
  122.         }

  123.     }

  124.     public PublicRepository(RepositoryManager rm) {
  125.         this.rm = rm;
  126.         this.pm = rm.getPrincipalManager();
  127.         Config config = rm.getConfiguration();

  128.         String storeType = config.getString(Settings.REPOSITORY_PUBLIC_TYPE, "memory").trim();
  129.         log.info("Public repository type: " + storeType);
  130.        
  131.         if (storeType.equalsIgnoreCase("memory")) {
  132.             this.repository = new SailRepository(new MemoryStore());
  133.         } else if (storeType.equalsIgnoreCase("native")) {
  134.             if (!config.containsKey(Settings.REPOSITORY_PUBLIC_PATH)) {
  135.                 log.error("Incomplete configuration of public repository");
  136.             } else {
  137.                 File path = new File(config.getURI(Settings.REPOSITORY_PUBLIC_PATH));
  138.                 String indexes = config.getString(Settings.REPOSITORY_PUBLIC_INDEXES);
  139.                 ((RepositoryManagerImpl) rm).checkAndUpgradeNativeStore(path, indexes);
  140.                 log.info("Public repository: using Native Store at {} with indexes {}", path, indexes);
  141.                
  142.                 NativeStore store = null;
  143.                 if (indexes != null) {
  144.                     store = new NativeStore(path, indexes);
  145.                 } else {
  146.                     store = new NativeStore(path);
  147.                 }
  148.                 if (store != null) {
  149.                     this.repository = new SailRepository(store);
  150.                 }
  151.             }
  152.         }
  153.        
  154.         if (this.repository == null) {
  155.             log.error("Failed to create public repository");
  156.             return;
  157.         }

  158.         try {
  159.             repository.init();
  160.         } catch (RepositoryException e) {
  161.             log.error(e.getMessage());
  162.         }

  163.         if (getTripleCount() == 0 ||
  164.                 "on".equalsIgnoreCase(config.getString(Settings.REPOSITORY_PUBLIC_REBUILD_ON_STARTUP, "off"))) {
  165.             rebuildRepository();
  166.         }

  167.         entrySubmitter = new PublicRepository.EntrySubmitter();
  168.         entrySubmitter.start();
  169.     }
  170.    
  171.     public RepositoryConnection getConnection() {
  172.         try {
  173.             return repository.getConnection();
  174.         } catch (RepositoryException e) {
  175.             log.error(e.getMessage());
  176.         }
  177.         return null;
  178.     }

  179.     public void enqueue(Entry entry) {
  180.         URI entryURI = entry.getEntryURI();
  181.         synchronized (postQueue) {
  182.             log.info("Adding document to update queue: " + entryURI);
  183.             postQueue.put(entryURI, entry);
  184.         }
  185.     }

  186.     public void remove(Entry entry) {
  187.         URI entryURI = entry.getEntryURI();
  188.         synchronized (deleteQueue) {
  189.             log.info("Adding entry to delete queue: " + entryURI);
  190.             deleteQueue.add(entry);
  191.         }
  192.     }

  193.     private void addEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
  194.         if (isAdministrative(e)) {
  195.             return;
  196.         }
  197.         URI currentUser = pm.getAuthenticatedUserURI();
  198.         try {
  199.             pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
  200.             try {
  201.                 ValueFactory vf = repository.getValueFactory();
  202.                 IRI contextURI = vf.createIRI(e.getContext().getURI().toString());

  203.                 // entry
  204.                 /* DEACTIVATED
  205.                 Graph entryGraph = e.getGraph();
  206.                 URI entryNG = vf.createIRI(e.getEntryURI().toString());
  207.                 if (entryGraph != null) {
  208.                     rc.add(entryGraph, entryNG, contextURI);
  209.                 }
  210.                 */

  211.                 // metadata
  212.                 Model mdGraph = null;
  213.                 IRI mdNG = null;
  214.                 if (e.getLocalMetadata() != null) {
  215.                     mdGraph = e.getLocalMetadata().getGraph();
  216.                     mdNG = vf.createIRI(e.getLocalMetadataURI().toString());
  217.                 }

  218.                 // ext metadata
  219.                 Model extMdGraph = null;
  220.                 IRI extMdNG = null;
  221.                 if (e.getCachedExternalMetadata() != null) {
  222.                     extMdGraph = e.getCachedExternalMetadata().getGraph();
  223.                     extMdNG = vf.createIRI(e.getCachedExternalMetadataURI().toString());
  224.                 }

  225.                 // resource
  226.                 Model resGraph = null;
  227.                 IRI resNG = null;
  228.                 if (GraphType.Graph.equals(e.getGraphType()) && EntryType.Local.equals(e.getEntryType())) {
  229.                     resGraph = (Model) e.getResource();
  230.                     resNG = vf.createIRI(e.getResourceURI().toString());
  231.                 }

  232.                 if (mdGraph != null) {
  233.                     rc.add(mdGraph, mdNG, contextURI);
  234.                 }
  235.                 if (extMdGraph != null) {
  236.                     rc.add(extMdGraph, extMdNG, contextURI);
  237.                 }
  238.                 if (resGraph != null) {
  239.                     rc.add(resGraph, resNG, contextURI);
  240.                 }
  241.             } catch (AuthorizationException ae) {
  242.             }
  243.         } finally {
  244.             pm.setAuthenticatedUserURI(currentUser);
  245.         }
  246.     }

  247.     private void updateEntries(Set<Entry> entries) {
  248.         URI currentUser = pm.getAuthenticatedUserURI();
  249.         try {
  250.             pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
  251.             synchronized (repository) {
  252.                 RepositoryConnection rc = null;
  253.                 try {
  254.                     rc = repository.getConnection();
  255.                     rc.begin();
  256.                     for (Entry e : entries) {
  257.                         updateEntry(e, rc);
  258.                     }
  259.                     rc.commit();
  260.                 } catch (RepositoryException re) {
  261.                     try {
  262.                         if (rc != null) {
  263.                             rc.rollback();
  264.                         }
  265.                     } catch (RepositoryException re1) {
  266.                         log.error(re1.getMessage());
  267.                     }
  268.                     log.error(re.getMessage());
  269.                 } finally {
  270.                     if (rc != null) {
  271.                         try {
  272.                             rc.close();
  273.                         } catch (RepositoryException re2) {
  274.                             log.error(re2.getMessage());
  275.                         }
  276.                     }
  277.                 }
  278.             }
  279.         } finally {
  280.             pm.setAuthenticatedUserURI(currentUser);
  281.         }
  282.     }

  283.     private void updateEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
  284.         if (e == null) {
  285.             return;
  286.         }
  287.        
  288.         // If entry is ResourceType.Context we update all its
  289.         // entries, just in case the ACL has changed
  290.         if (GraphType.Context.equals(e.getGraphType()) && EntryType.Local.equals(e.getEntryType())) {
  291.             String contextURI = e.getResourceURI().toString();
  292.             String id = contextURI.substring(contextURI.lastIndexOf("/") + 1);
  293.             Context context = rm.getContextManager().getContext(id);
  294.             if (context != null) {
  295.                 Set<URI> entries = context.getEntries();
  296.                 for (URI entryURI : entries) {
  297.                     if (entryURI != null) {
  298.                         try {
  299.                             updateEntry(rm.getContextManager().getEntry(entryURI), rc);
  300.                         } catch (AuthorizationException ae) {
  301.                             continue;
  302.                         }
  303.                     }
  304.                 }
  305.             }
  306.         } else {
  307.             log.debug("Processing entry: " + e.getEntryURI());
  308.             removeEntry(e, rc);
  309.             addEntry(e, rc);
  310.         }
  311.     }

  312.     private void removeEntries(Set<Entry> entries) {
  313.         URI currentUser = pm.getAuthenticatedUserURI();
  314.         try {
  315.             pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
  316.             synchronized (repository) {
  317.                 RepositoryConnection rc = null;
  318.                 try {
  319.                     rc = repository.getConnection();
  320.                     rc.begin();
  321.                     for (Entry e : entries) {
  322.                         removeEntry(e, rc);
  323.                     }
  324.                     rc.commit();
  325.                 } catch (RepositoryException re) {
  326.                     try {
  327.                         rc.rollback();
  328.                     } catch (RepositoryException re1) {
  329.                         log.error(re1.getMessage());
  330.                     }
  331.                     log.error(re.getMessage());
  332.                 } finally {
  333.                     if (rc != null) {
  334.                         try {
  335.                             rc.close();
  336.                         } catch (RepositoryException re2) {
  337.                             log.error(re2.getMessage());
  338.                         }
  339.                     }
  340.                 }
  341.             }
  342.         } finally {
  343.             pm.setAuthenticatedUserURI(currentUser);
  344.         }
  345.     }

  346.     private void removeEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
  347.         PrincipalManager pm = e.getRepositoryManager().getPrincipalManager();
  348.         URI currentUser = pm.getAuthenticatedUserURI();
  349.         try {
  350.             // we need to be admin, in case the ACL has become
  351.             // more restrictive since adding the entry
  352.             pm.setAuthenticatedUserURI(pm.getAdminUser().getURI());
  353.             ValueFactory vf = repository.getValueFactory();
  354.             IRI contextURI = vf.createIRI(e.getContext().getURI().toString());

  355.             IRI entryNG = vf.createIRI(e.getEntryURI().toString());
  356.             IRI mdNG = vf.createIRI(e.getLocalMetadataURI().toString());
  357.             IRI resNG = vf.createIRI(e.getResourceURI().toString());
  358.             IRI extMdNG = null;

  359.             if (e.getExternalMetadataURI() != null) {
  360.                 extMdNG = vf.createIRI(e.getCachedExternalMetadataURI().toString());
  361.             }

  362.             if (extMdNG != null) {
  363.                 rc.remove(rc.getStatements((Resource) null, (IRI) null, (Value) null, false, entryNG, mdNG, extMdNG, resNG), contextURI, entryNG, mdNG, extMdNG, resNG);
  364.             } else {
  365.                 rc.remove(rc.getStatements((Resource) null, (IRI) null, (Value) null, false, entryNG, mdNG, extMdNG, resNG), contextURI, entryNG, mdNG, resNG);
  366.             }
  367.         } finally {
  368.             pm.setAuthenticatedUserURI(currentUser);
  369.         }
  370.     }

  371.     public void rebuildRepository() {
  372.         synchronized (repository) {
  373.             if (rebuilding) {
  374.                 log.warn("The public repository is already being rebuilt: ignoring additional rebuilding requests");
  375.                 return;
  376.             } else {
  377.                 rebuilding = true;
  378.             }
  379.         }

  380.         log.info("Rebuilding public repository");

  381.         synchronized (repository) {
  382.             RepositoryConnection rc = null;
  383.             try {
  384.                 rc = repository.getConnection();
  385.                 Date before = new Date();
  386.                 rc.begin();
  387.                 rc.clear();
  388.                 log.info("Clearing public repository took " + (new Date().getTime() - before.getTime()) + " ms");

  389.                 ContextManager cm = rm.getContextManager();
  390.                 Set<URI> contexts = cm.getEntries();

  391.                 for (URI contextURI : contexts) {
  392.                     String id = contextURI.toString().substring(contextURI.toString().lastIndexOf("/") + 1);
  393.                     Context context = cm.getContext(id);
  394.                     if (context != null) {
  395.                         log.info("Adding context " + contextURI + " to public repository");
  396.                         before = new Date();
  397.                         Set<URI> entries = context.getEntries();
  398.                         log.info("Fetching entries took " + (new Date().getTime() - before.getTime()) + " ms");
  399.                         before = new Date();
  400.                         Date timeTracker = new Date();
  401.                         long publicEntryCount = 0;
  402.                         long processedCount = 0;
  403.                         for (URI entryURI : entries) {
  404.                             if (entryURI != null) {
  405.                                 processedCount++;
  406.                                 if ((new Date().getTime() - timeTracker.getTime()) > 60000) {
  407.                                     if (processedCount > 0) {
  408.                                         log.debug("Average time per entry after " + (new Date().getTime() - before.getTime()) + " ms: " + ((new Date().getTime() - before.getTime()) / processedCount) + " ms");
  409.                                         timeTracker = new Date();
  410.                                     }
  411.                                 }
  412.                                 try {
  413.                                     Entry entry = cm.getEntry(entryURI);
  414.                                     if (entry == null) {
  415.                                         continue;
  416.                                     }
  417.                                     addEntry(entry, rc);
  418.                                     publicEntryCount++;
  419.                                 } catch (AuthorizationException ae) {
  420.                                     continue;
  421.                                 }
  422.                             }
  423.                         }
  424.                         log.info("Added " + publicEntryCount + " entries to public repository");
  425.                         log.info("Total time for context: " + (new Date().getTime() - before.getTime()) + " ms");
  426.                         if (entries.size() > 0) {
  427.                             log.debug("Total average time per entry: " + ((new Date().getTime() - before.getTime()) / entries.size()) + " ms");
  428.                         }
  429.                         log.info("Done processing context " + contextURI);
  430.                     }
  431.                 }
  432.                 rc.commit();
  433.             } catch (RepositoryException re) {
  434.                 try {
  435.                     rc.rollback();
  436.                 } catch (RepositoryException re1) {
  437.                     log.error(re1.getMessage());
  438.                 }
  439.                 log.error(re.getMessage());
  440.             } finally {
  441.                 try {
  442.                     rc.close();
  443.                 } catch (RepositoryException re) {
  444.                     log.error(re.getMessage());
  445.                 }
  446.                 log.info("Rebuild of public repository complete");
  447.                 log.info("Number of triples in public repository: " + getTripleCount());
  448.                 rebuilding = false;
  449.             }
  450.         }
  451.     }

  452.     private boolean isAdministrative(Entry e) {
  453.         GraphType gt = e.getGraphType();
  454.         if (GraphType.Graph.equals(gt) ||
  455.                 GraphType.String.equals(gt) ||
  456.                 GraphType.None.equals(gt) ||
  457.                 GraphType.List.equals(gt)) {
  458.             return false;
  459.         }
  460.         return true;
  461.     }

  462.     public long getTripleCount() {
  463.         long amountTriples = 0;
  464.         RepositoryConnection rc = null;
  465.         try {
  466.             rc = repository.getConnection();
  467.             amountTriples = rc.size();
  468.         } catch (RepositoryException re) {
  469.             log.error(re.getMessage());
  470.         } finally {
  471.             if (rc != null) {
  472.                 try {
  473.                     rc.close();
  474.                 } catch (RepositoryException e) {
  475.                     log.error(e.getMessage());
  476.                 }
  477.             }
  478.         }
  479.         return amountTriples;
  480.     }

  481.     public void shutdown() {
  482.         if (entrySubmitter != null) {
  483.             entrySubmitter.interrupt();
  484.         }

  485.         try {
  486.             repository.shutDown();
  487.         } catch (RepositoryException e) {
  488.             log.error("Error when shutting down public repository: " + e.getMessage());
  489.         }
  490.     }

  491. }