PublicRepository.java
- /*
- * Copyright (c) 2007-2017 MetaSolutions AB
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.entrystore.impl;
- import com.github.benmanes.caffeine.cache.Cache;
- import com.github.benmanes.caffeine.cache.Caffeine;
- import com.google.common.collect.Queues;
- import org.eclipse.rdf4j.model.IRI;
- import org.eclipse.rdf4j.model.Model;
- import org.eclipse.rdf4j.model.Resource;
- import org.eclipse.rdf4j.model.Value;
- import org.eclipse.rdf4j.model.ValueFactory;
- import org.eclipse.rdf4j.repository.Repository;
- import org.eclipse.rdf4j.repository.RepositoryConnection;
- import org.eclipse.rdf4j.repository.RepositoryException;
- import org.eclipse.rdf4j.repository.sail.SailRepository;
- import org.eclipse.rdf4j.sail.memory.MemoryStore;
- import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
- import org.entrystore.AuthorizationException;
- import org.entrystore.Context;
- import org.entrystore.ContextManager;
- import org.entrystore.Entry;
- import org.entrystore.EntryType;
- import org.entrystore.GraphType;
- import org.entrystore.PrincipalManager;
- import org.entrystore.config.Config;
- import org.entrystore.repository.RepositoryManager;
- import org.entrystore.repository.config.Settings;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.File;
- import java.net.URI;
- import java.util.Date;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.Queue;
- import java.util.Set;
- import java.util.concurrent.ConcurrentMap;
- /**
- * @author Hannes Ebner
- */
- public class PublicRepository {
-
- Logger log = LoggerFactory.getLogger(PublicRepository.class);
-
- private boolean rebuilding = false;
-
- private Repository repository;
-
- private RepositoryManager rm;
-
- private PrincipalManager pm;
- private Thread entrySubmitter;
- private final Cache<URI, Entry> postQueue = Caffeine.newBuilder().build();
- private final Queue<Entry> deleteQueue = Queues.newConcurrentLinkedQueue();
- private static final int BATCH_SIZE = 1000;
- public class EntrySubmitter extends Thread {
- @Override
- public void run() {
- while (!interrupted()) {
- postQueue.cleanUp();
- int batchCount = 0;
- if (postQueue.estimatedSize() > 0 || !deleteQueue.isEmpty()) {
- if (!deleteQueue.isEmpty()) {
- Set<Entry> entriesToRemove = new HashSet<>();
- synchronized (deleteQueue) {
- while (batchCount < BATCH_SIZE) {
- Entry e = deleteQueue.poll();
- if (e == null) {
- break;
- }
- entriesToRemove.add(e);
- batchCount++;
- }
- }
- if (batchCount > 0) {
- log.info("Removing " + batchCount + " entries from Public Repository, " + deleteQueue.size() + " entries remaining in removal queue");
- removeEntries(entriesToRemove);
- }
- }
- if (postQueue.estimatedSize() > 0) {
- Set<Entry> entriesToUpdate = new HashSet<>();
- synchronized (postQueue) {
- ConcurrentMap<URI, Entry> postQueueMap = postQueue.asMap();
- Iterator<URI> it = postQueueMap.keySet().iterator();
- while (batchCount < BATCH_SIZE && it.hasNext()) {
- URI key = it.next();
- Entry entry = postQueueMap.get(key);
- postQueueMap.remove(key, entry);
- if (entry == null) {
- log.warn("Value for key " + key + " is null in Public Repository submit queue");
- }
- entriesToUpdate.add(entry);
- batchCount++;
- }
- }
- postQueue.cleanUp();
- log.info("Sending " + entriesToUpdate.size() + " entries for update in Public Repository, " + postQueue.estimatedSize() + " entries remaining in post queue");
- updateEntries(entriesToUpdate);
- }
- } else {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ie) {
- log.info("Public Repository submitter got interrupted, shutting down submitter thread");
- return;
- }
- }
- }
- }
- }
- public PublicRepository(RepositoryManager rm) {
- this.rm = rm;
- this.pm = rm.getPrincipalManager();
- Config config = rm.getConfiguration();
- String storeType = config.getString(Settings.REPOSITORY_PUBLIC_TYPE, "memory").trim();
- log.info("Public repository type: " + storeType);
-
- if (storeType.equalsIgnoreCase("memory")) {
- this.repository = new SailRepository(new MemoryStore());
- } else if (storeType.equalsIgnoreCase("native")) {
- if (!config.containsKey(Settings.REPOSITORY_PUBLIC_PATH)) {
- log.error("Incomplete configuration of public repository");
- } else {
- File path = new File(config.getURI(Settings.REPOSITORY_PUBLIC_PATH));
- String indexes = config.getString(Settings.REPOSITORY_PUBLIC_INDEXES);
- ((RepositoryManagerImpl) rm).checkAndUpgradeNativeStore(path, indexes);
- log.info("Public repository: using Native Store at {} with indexes {}", path, indexes);
-
- NativeStore store = null;
- if (indexes != null) {
- store = new NativeStore(path, indexes);
- } else {
- store = new NativeStore(path);
- }
- if (store != null) {
- this.repository = new SailRepository(store);
- }
- }
- }
-
- if (this.repository == null) {
- log.error("Failed to create public repository");
- return;
- }
- try {
- repository.init();
- } catch (RepositoryException e) {
- log.error(e.getMessage());
- }
- if (getTripleCount() == 0 ||
- "on".equalsIgnoreCase(config.getString(Settings.REPOSITORY_PUBLIC_REBUILD_ON_STARTUP, "off"))) {
- rebuildRepository();
- }
- entrySubmitter = new PublicRepository.EntrySubmitter();
- entrySubmitter.start();
- }
-
- public RepositoryConnection getConnection() {
- try {
- return repository.getConnection();
- } catch (RepositoryException e) {
- log.error(e.getMessage());
- }
- return null;
- }
- public void enqueue(Entry entry) {
- URI entryURI = entry.getEntryURI();
- synchronized (postQueue) {
- log.info("Adding document to update queue: " + entryURI);
- postQueue.put(entryURI, entry);
- }
- }
- public void remove(Entry entry) {
- URI entryURI = entry.getEntryURI();
- synchronized (deleteQueue) {
- log.info("Adding entry to delete queue: " + entryURI);
- deleteQueue.add(entry);
- }
- }
- private void addEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
- if (isAdministrative(e)) {
- return;
- }
- URI currentUser = pm.getAuthenticatedUserURI();
- try {
- pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
- try {
- ValueFactory vf = repository.getValueFactory();
- IRI contextURI = vf.createIRI(e.getContext().getURI().toString());
- // entry
- /* DEACTIVATED
- Graph entryGraph = e.getGraph();
- URI entryNG = vf.createIRI(e.getEntryURI().toString());
- if (entryGraph != null) {
- rc.add(entryGraph, entryNG, contextURI);
- }
- */
- // metadata
- Model mdGraph = null;
- IRI mdNG = null;
- if (e.getLocalMetadata() != null) {
- mdGraph = e.getLocalMetadata().getGraph();
- mdNG = vf.createIRI(e.getLocalMetadataURI().toString());
- }
- // ext metadata
- Model extMdGraph = null;
- IRI extMdNG = null;
- if (e.getCachedExternalMetadata() != null) {
- extMdGraph = e.getCachedExternalMetadata().getGraph();
- extMdNG = vf.createIRI(e.getCachedExternalMetadataURI().toString());
- }
- // resource
- Model resGraph = null;
- IRI resNG = null;
- if (GraphType.Graph.equals(e.getGraphType()) && EntryType.Local.equals(e.getEntryType())) {
- resGraph = (Model) e.getResource();
- resNG = vf.createIRI(e.getResourceURI().toString());
- }
- if (mdGraph != null) {
- rc.add(mdGraph, mdNG, contextURI);
- }
- if (extMdGraph != null) {
- rc.add(extMdGraph, extMdNG, contextURI);
- }
- if (resGraph != null) {
- rc.add(resGraph, resNG, contextURI);
- }
- } catch (AuthorizationException ae) {
- }
- } finally {
- pm.setAuthenticatedUserURI(currentUser);
- }
- }
- private void updateEntries(Set<Entry> entries) {
- URI currentUser = pm.getAuthenticatedUserURI();
- try {
- pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
- synchronized (repository) {
- RepositoryConnection rc = null;
- try {
- rc = repository.getConnection();
- rc.begin();
- for (Entry e : entries) {
- updateEntry(e, rc);
- }
- rc.commit();
- } catch (RepositoryException re) {
- try {
- if (rc != null) {
- rc.rollback();
- }
- } catch (RepositoryException re1) {
- log.error(re1.getMessage());
- }
- log.error(re.getMessage());
- } finally {
- if (rc != null) {
- try {
- rc.close();
- } catch (RepositoryException re2) {
- log.error(re2.getMessage());
- }
- }
- }
- }
- } finally {
- pm.setAuthenticatedUserURI(currentUser);
- }
- }
- private void updateEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
- if (e == null) {
- return;
- }
-
- // If entry is ResourceType.Context we update all its
- // entries, just in case the ACL has changed
- if (GraphType.Context.equals(e.getGraphType()) && EntryType.Local.equals(e.getEntryType())) {
- String contextURI = e.getResourceURI().toString();
- String id = contextURI.substring(contextURI.lastIndexOf("/") + 1);
- Context context = rm.getContextManager().getContext(id);
- if (context != null) {
- Set<URI> entries = context.getEntries();
- for (URI entryURI : entries) {
- if (entryURI != null) {
- try {
- updateEntry(rm.getContextManager().getEntry(entryURI), rc);
- } catch (AuthorizationException ae) {
- continue;
- }
- }
- }
- }
- } else {
- log.debug("Processing entry: " + e.getEntryURI());
- removeEntry(e, rc);
- addEntry(e, rc);
- }
- }
- private void removeEntries(Set<Entry> entries) {
- URI currentUser = pm.getAuthenticatedUserURI();
- try {
- pm.setAuthenticatedUserURI(pm.getGuestUser().getURI());
- synchronized (repository) {
- RepositoryConnection rc = null;
- try {
- rc = repository.getConnection();
- rc.begin();
- for (Entry e : entries) {
- removeEntry(e, rc);
- }
- rc.commit();
- } catch (RepositoryException re) {
- try {
- rc.rollback();
- } catch (RepositoryException re1) {
- log.error(re1.getMessage());
- }
- log.error(re.getMessage());
- } finally {
- if (rc != null) {
- try {
- rc.close();
- } catch (RepositoryException re2) {
- log.error(re2.getMessage());
- }
- }
- }
- }
- } finally {
- pm.setAuthenticatedUserURI(currentUser);
- }
- }
- private void removeEntry(Entry e, RepositoryConnection rc) throws RepositoryException {
- PrincipalManager pm = e.getRepositoryManager().getPrincipalManager();
- URI currentUser = pm.getAuthenticatedUserURI();
- try {
- // we need to be admin, in case the ACL has become
- // more restrictive since adding the entry
- pm.setAuthenticatedUserURI(pm.getAdminUser().getURI());
- ValueFactory vf = repository.getValueFactory();
- IRI contextURI = vf.createIRI(e.getContext().getURI().toString());
- IRI entryNG = vf.createIRI(e.getEntryURI().toString());
- IRI mdNG = vf.createIRI(e.getLocalMetadataURI().toString());
- IRI resNG = vf.createIRI(e.getResourceURI().toString());
- IRI extMdNG = null;
- if (e.getExternalMetadataURI() != null) {
- extMdNG = vf.createIRI(e.getCachedExternalMetadataURI().toString());
- }
- if (extMdNG != null) {
- rc.remove(rc.getStatements((Resource) null, (IRI) null, (Value) null, false, entryNG, mdNG, extMdNG, resNG), contextURI, entryNG, mdNG, extMdNG, resNG);
- } else {
- rc.remove(rc.getStatements((Resource) null, (IRI) null, (Value) null, false, entryNG, mdNG, extMdNG, resNG), contextURI, entryNG, mdNG, resNG);
- }
- } finally {
- pm.setAuthenticatedUserURI(currentUser);
- }
- }
- public void rebuildRepository() {
- synchronized (repository) {
- if (rebuilding) {
- log.warn("The public repository is already being rebuilt: ignoring additional rebuilding requests");
- return;
- } else {
- rebuilding = true;
- }
- }
- log.info("Rebuilding public repository");
- synchronized (repository) {
- RepositoryConnection rc = null;
- try {
- rc = repository.getConnection();
- Date before = new Date();
- rc.begin();
- rc.clear();
- log.info("Clearing public repository took " + (new Date().getTime() - before.getTime()) + " ms");
- ContextManager cm = rm.getContextManager();
- Set<URI> contexts = cm.getEntries();
- for (URI contextURI : contexts) {
- String id = contextURI.toString().substring(contextURI.toString().lastIndexOf("/") + 1);
- Context context = cm.getContext(id);
- if (context != null) {
- log.info("Adding context " + contextURI + " to public repository");
- before = new Date();
- Set<URI> entries = context.getEntries();
- log.info("Fetching entries took " + (new Date().getTime() - before.getTime()) + " ms");
- before = new Date();
- Date timeTracker = new Date();
- long publicEntryCount = 0;
- long processedCount = 0;
- for (URI entryURI : entries) {
- if (entryURI != null) {
- processedCount++;
- if ((new Date().getTime() - timeTracker.getTime()) > 60000) {
- if (processedCount > 0) {
- log.debug("Average time per entry after " + (new Date().getTime() - before.getTime()) + " ms: " + ((new Date().getTime() - before.getTime()) / processedCount) + " ms");
- timeTracker = new Date();
- }
- }
- try {
- Entry entry = cm.getEntry(entryURI);
- if (entry == null) {
- continue;
- }
- addEntry(entry, rc);
- publicEntryCount++;
- } catch (AuthorizationException ae) {
- continue;
- }
- }
- }
- log.info("Added " + publicEntryCount + " entries to public repository");
- log.info("Total time for context: " + (new Date().getTime() - before.getTime()) + " ms");
- if (entries.size() > 0) {
- log.debug("Total average time per entry: " + ((new Date().getTime() - before.getTime()) / entries.size()) + " ms");
- }
- log.info("Done processing context " + contextURI);
- }
- }
- rc.commit();
- } catch (RepositoryException re) {
- try {
- rc.rollback();
- } catch (RepositoryException re1) {
- log.error(re1.getMessage());
- }
- log.error(re.getMessage());
- } finally {
- try {
- rc.close();
- } catch (RepositoryException re) {
- log.error(re.getMessage());
- }
- log.info("Rebuild of public repository complete");
- log.info("Number of triples in public repository: " + getTripleCount());
- rebuilding = false;
- }
- }
- }
- private boolean isAdministrative(Entry e) {
- GraphType gt = e.getGraphType();
- if (GraphType.Graph.equals(gt) ||
- GraphType.String.equals(gt) ||
- GraphType.None.equals(gt) ||
- GraphType.List.equals(gt)) {
- return false;
- }
- return true;
- }
- public long getTripleCount() {
- long amountTriples = 0;
- RepositoryConnection rc = null;
- try {
- rc = repository.getConnection();
- amountTriples = rc.size();
- } catch (RepositoryException re) {
- log.error(re.getMessage());
- } finally {
- if (rc != null) {
- try {
- rc.close();
- } catch (RepositoryException e) {
- log.error(e.getMessage());
- }
- }
- }
- return amountTriples;
- }
- public void shutdown() {
- if (entrySubmitter != null) {
- entrySubmitter.interrupt();
- }
- try {
- repository.shutDown();
- } catch (RepositoryException e) {
- log.error("Error when shutting down public repository: " + e.getMessage());
- }
- }
- }