Source code for eds4jinja2.adapters.remote_sparql_ds
#!/usr/bin/python3
# remote_sparql_ds.py
# Date: 07/08/2020
# Author: Eugeniu Costetchi
# Email: costezki.eugen@gmail.com
import io
from pathlib import Path
from typing import Optional
from SPARQLWrapper import SPARQLWrapper, JSON, CSV
from py_singleton import singleton
from eds4jinja2.adapters.base_data_source import DataSource
import pandas as pd
from eds4jinja2.adapters.substitution_template import SubstitutionTemplate
DEFAULT_ENCODING = 'utf-8'
[docs]@singleton
class SPARQLClientPool(object):
"""
A singleton connection pool, that hosts a dictionary of endpoint_urls and
a corresponding SPARQLWrapper object connecting to it.
The rationale of this connection pool is to reuse connection objects and save time.
"""
connection_pool = {}
@staticmethod
def create_or_reuse_connection(endpoint_url: str):
if endpoint_url not in SPARQLClientPool.connection_pool:
SPARQLClientPool.connection_pool[endpoint_url] = SPARQLWrapper(endpoint_url)
return SPARQLClientPool.connection_pool[endpoint_url]
# safe instantiation
SPARQLClientPool.instance()
[docs]class RemoteSPARQLEndpointDataSource(DataSource):
"""
Fetches data from SPARQL endpoint. Can be used either with a SPARQL query or a URI to be described.
To query a SPARQL endpoint and get the results as *dict* object
>>> ds = RemoteSPARQLEndpointDataSource(sparql_endpoint_url)
>>> dict_object = ds.with_query(sparql_query_text)._fetch_tree()
unpack the content and error for a fail safe fetching
>>> dict_object, error_string = ds.with_query(sparql_query_text).fetch_tree()
To describe an URI and get the results as a pandas DataFrame
>>> pd_dataframe = ds.with_uri(existent_uri)._fetch_tree()
unpack the content and error for a fail safe fetching
>>> pd_dataframe, error_string = ds.with_uri(existent_uri).fetch_tree()
In case you want to target URI description from a Named Graph
>>> pd_dataframe, error_string = ds.with_uri(existent_uri,named_graph).fetch_tree()
"""
def __init__(self, endpoint_url):
self.endpoint = SPARQLClientPool.create_or_reuse_connection(endpoint_url)
self.__can_be_tree = True
self.__can_be_tabular = True
[docs] def with_query(self, sparql_query: str, substitution_variables: dict = None,
sparql_prefixes: str = "") -> 'RemoteSPARQLEndpointDataSource':
"""
Set the query text and return the reference to self for chaining.
:return:
"""
if substitution_variables:
template_query = SubstitutionTemplate(sparql_query)
sparql_query = template_query.safe_substitute(substitution_variables)
new_query = (sparql_prefixes + " " + sparql_query).strip()
self.endpoint.setQuery(new_query)
return self
[docs] def with_query_from_file(self, sparql_query_file_path: str, substitution_variables: dict = None,
prefixes: str = "") -> 'RemoteSPARQLEndpointDataSource':
"""
Set the query text and return the reference to self for chaining.
:return:
"""
with open(Path(sparql_query_file_path).resolve(), 'r') as file:
query_from_file = file.read()
if substitution_variables:
template_query = SubstitutionTemplate(query_from_file)
query_from_file = template_query.safe_substitute(substitution_variables)
new_query = (prefixes + " " + query_from_file).strip()
self.endpoint.setQuery(new_query)
return self
[docs] def with_uri(self, uri: str, graph_uri: Optional[str] = None) -> 'RemoteSPARQLEndpointDataSource':
"""
Set the query text and return the reference to self for chaining.
:return:
"""
if graph_uri:
self.endpoint.setQuery(f"DESCRIBE <{uri}> FROM <{graph_uri}>")
else:
self.endpoint.setQuery(f"DESCRIBE <{uri}>")
return self
[docs] def _fetch_tree(self):
if not self.endpoint.queryString or self.endpoint.queryString.isspace():
raise Exception("The query is empty.")
self.endpoint.setReturnFormat(JSON)
query = self.endpoint.query()
return query.convert()
[docs] def _fetch_tabular(self):
if not self.endpoint.queryString or self.endpoint.queryString.isspace():
raise Exception("The query is empty.")
self.endpoint.setReturnFormat(CSV)
query_result = self.endpoint.queryAndConvert()
return pd.read_csv(io.StringIO(str(query_result, encoding=DEFAULT_ENCODING)))
[docs] def _can_be_tree(self) -> bool:
return self.__can_be_tree
[docs] def _can_be_tabular(self) -> bool:
return self.__can_be_tabular
def __str__(self):
return f"from <...{str(self.endpoint.endpoint)[-30:]}> {str(self.endpoint.queryString)[:60]} ..."