""" :module PseudomonasDotComScraper: Hosting the PseudomonasDotComScraper, an API for the https://www.pseudomonas.com database web interface. """
from GenDBScraper.Utilities.json_utilities import JSONEncoder
from GenDBScraper.Utilities.web_utilities import guarded_get, is_good_response
# 3rd party imports
from bs4 import BeautifulSoup
from collections import namedtuple
from doi2bib import crossref
from io import StringIO
from pubmed_lookup import Publication, PubMedLookup
import json
import logging
import os
import pandas
import re
import tempfile
# Configure logging.
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
# Define the query datastructure.
pdc_query = namedtuple('pdc_query',
field_names=('strain', 'feature', 'organism'),
defaults=(None, None, None),
)
[docs]class PseudomonasDotComScraper():
""" An API for the pseudomonas.com genome database using web scraping technology. """
# Class constructor
def __init__(self,
query=None,
):
"""
PseudomonasDotComScraper constructor.
:param query: The query to submit to the database.
:type query: (pdc_query || dict)
:example: scraper = PseudomonasDotComScraper(query={'strain' : 'sbw25', 'feature' : 'pflu0916'})
:example: scraper = PseudomonasDotComScraper(query=pdc_query(strain='sbw25', feature='pflu0916'))
"""
# Base class initialization.
#super(<+ClassName+>).__init__(<+base_class_args+>)
# Initialize all variables.
self.__query = None
self.__pdc_url = 'https://www.pseudomonas.com'
self.__browser = None
self.__connected = False
# Set attributes via setter.
self.query = query
# Attribute accessors
@property
def connected(self):
return self.__connected
@property
def query(self):
""" Get the query.
:return: The query object.
:rtype: pdc_query
"""
return self.__query
@query.setter
def query(self, val):
""""""
""" Set the query attribute.
:param val: The value to set.
:type val: (pdc_query | dict)
:raises KeyError: Both 'strain' and 'organism' are provided.
"""
# Checks
if val is None:
val = [pdc_query(strain='sbw25')]
exc = TypeError("The parameter 'query' must be a dict or pdc_query or a list, tuple, or set of queries. Examples: query={'strain' : 'sbw25', 'feature'='pflu0916'}; query=pdc_query(strain='sbw25', feature='pflu0916') or query=[pdc_query(strain='sbw25', feature='pflu0916'), pdc_query(strain='sbw25', feature='pflu0917')].")
if not isinstance(val, list):
if not (isinstance(val, dict) or isinstance(val, pdc_query)):
raise exc
else:
val = [val]
for i,v in enumerate(val):
if isinstance(v, dict):
pass
elif isinstance(v, pdc_query):
pass
else:
raise exc
# Iterate over all queries.
for i,v in enumerate(val):
# Check keys if dict.
if isinstance(v, dict):
# Only these are acceptable query keywords.
accepted_keys = ('strain', 'feature', 'organism')
present_keys = v.keys()
for k in present_keys:
if not k in accepted_keys:
raise KeyError("Only 'strain', 'feature', and 'organism' are acceptable keys.)")
# Complete keywords.
if not 'strain' in v.keys():
v['strain'] = None
if not 'feature' in v.keys():
v['feature'] = None
if not 'organism' in v.keys():
v['organism'] = None
# Convert to pdc_query
logging.info('Query dictionary passed to pseudomonas.com scraper will now be converted to a pdc_query object. See reference manual for more details.')
v = _dict_to_pdc_query(**v)
# Check keywords are internally consistent.
if v.organism is not None and v.strain is not None:
raise KeyError("Invalid combination of query keywords: 'organism' must not be combined with 'strain'.")
# Check all values are strings or None.
for vv in v[:]:
if not (isinstance(vv, str) or vv is None):
raise TypeError("All values in the query must be of type str.")
# Reset checked item.
val[i] = v
self.__query = val
[docs] def connect(self):
""" Connect to the database. """
try:
self.__browser = BeautifulSoup(guarded_get(self.__pdc_url), 'html.parser')
except:
self.__connected = False
raise ConnectionError("Connecting to {0:s} failed. Make sure the URL is set correctly and is reachable.")
self.__connected = True
[docs] def run_query(self, query=None):
""" Run a query on pseudomonas.com
:param query: The query object to run.
:type query: [list of] (pdc_query | dict)
:return: The query results as a dictionary with 'strain_feature' keys.
:rtype: dict
"""
# Check if we're connected. Bail out if not.
if not self.__connected:
raise RuntimeError("Not connected. Call .connect() before submitting the query.")
# If provided, update the local query object. This way, user can submit a query at run time.
if query is not None:
self.query = query
results = dict()
for query in self.query:
key = "{0:s}__{1:s}".format(query.strain, query.feature)
results[key] = self._run_one_query(query)
return results
def _get_feature_url(self, query):
""" Get the base URL for the queried feature (gene).
:param query: Query object.
:type query: pdc_query
"""
# Form http self.query string.
_feature = query.feature
if _feature is None:
_feature = ''
# Assemble the html query.
if query.strain is not None: # Searching for specific strain.
_url = self.__pdc_url+"/primarySequenceFeature/list?c1=name&v1={0:s}&e1=1&term1={1:s}&assembly=complete".format(_feature, query.strain)
elif query.organism is not None: # Searching for organism.
_url = self.__pdc_url+"/primarySequenceFeature/list?c1=name&v1={0:s}&e1=1&term2={1:s}&assembly=complete".format(_feature, self.query.organism)
# Debug info.
logging.debug("Will now open {0:s} .".format(_url))
# Get the soup for the assembled url.
browser = BeautifulSoup(guarded_get(_url), 'html.parser')
# If we're looking for a unique feature.
if _feature is not '':
feature_link = browser.find_all('a', string=re.compile(_feature.upper()))[0].get('href')
return self.__pdc_url + feature_link
def _run_one_query(self, query):
""" """
""" Workhorse function to run a query.
:param query: Query object to submit.
:type query: pdc_query
"""
# Setup dict to store self.query results.
panels = dict()
feature_url = self._get_feature_url(query)
# Go through all panels and pull data.
self._get_overview(feature_url, panels)
self._get_sequences(feature_url, panels)
self._get_functions_pathways_go(feature_url, panels)
self._get_motifs(feature_url, panels)
self._get_operons(feature_url, panels)
self._get_transposon_insertions(feature_url, panels)
self._get_updates(feature_url, panels)
self._get_orthologs(feature_url, panels)
# All done, return.
return panels
def _get_overview(self, url, panels):
""" Parse the 'Overview' tab and extract the tables.
:param url: The base URL feature.
:type url: str
:param panels [in/out]: The datastructure into which the tables are stored.
:type panel: dict
"""
# Get overview data.
overview_url = url + "&view=overview"
# Get the soup.
browser = BeautifulSoup(guarded_get(overview_url), 'lxml')
# Loop over headings and get table as pandas.pandas.DataFrame.
panels["Gene Feature Overview"] = _pandasDF_from_heading(browser, "Gene Feature Overview", 0)
panels["Cross-References"] = _pandasDF_from_heading(browser, "Cross-References", 0)
panels["Product"] = _pandasDF_from_heading(browser, "Product", 0)
panels["Subcellular localization"] = _pandasDF_from_heading(browser, "Subcellular localization", 0)
panels["Pathogen Association Analysis"] = _pandasDF_from_heading(browser, "Pathogen Association Analysis", 0)
panels["Orthologs/Comparative Genomics"] = _pandasDF_from_heading(browser, "Orthologs/Comparative Genomics", 0 )
panels["Interactions"] = _pandasDF_from_heading(browser, "Interactions", 0)
panels["References"] = _pandas_references(browser)
def _get_sequences(self, url, panels):
""" Parse the 'Sequences' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
sequence_url = url + "&view=sequence"
browser = BeautifulSoup(guarded_get(sequence_url), 'html.parser')
panels['Sequence Data'] = _pandasDF_from_heading(browser, "Sequence Data", None)
def _get_functions_pathways_go(self, url, panels):
""" Parse the 'Function/Pathways/GO' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
# Get functions, pathways, GO
function_url = url + "&view=functions"
browser = BeautifulSoup(guarded_get(function_url), 'html.parser')
panels["Gene Ontology"] = _pandasDF_from_heading(browser,"Gene Ontology", None)
panels["Functional Classifications Manually Assigned by PseudoCAP"] = _pandasDF_from_heading(browser,"Functional Classifications Manually Assigned by PseudoCAP", None)
panels["Functional Predictions from Interpro"] = _pandasDF_from_heading(browser,"Functional Predictions from Interpro", None)
def _get_motifs(self, url, panels):
""" Parse the 'Motifs' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
# Get motifs tab.
motifs_url = url + "&view=motifs"
browser = BeautifulSoup(guarded_get(motifs_url), 'html.parser')
panels["Motifs"] = None
def _get_operons(self, url, panels):
""" Parse the 'operons' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panels: dict
"""
# Get operons tab.
operons_url = url + "&view=operons"
soup = BeautifulSoup(guarded_get(operons_url), 'lxml')
table_heading = "Operons"
# Navigate to heading.
heading = soup.find('h3', string=re.compile(table_heading))
# Get content.
operons = heading.find_next_siblings('table')
# Setup empty dict to store results.
operons_dict = dict()
# Loop over operons.
for operon in operons:
operon_dict = dict()
try:
tmp = pandas.read_html(str(operon))
except:
logging.warning("No operon data found.")
break
name = operon.findChild(string=re.compile("Operon name"))
tabs = re.compile("\t*")
name = tabs.sub("", name)
name = name.split("\n")[2]
operon_dict['Name'] = name
operon_dict['Genes'] = tmp[1]
evidence = str(operon.find(string=re.compile('Evidence')).find_next('div').text)
evidence=re.compile("[\t\n\s\.]").sub("",evidence)
evidence = re.sub("\.","",evidence)
operon_dict['Evidence'] = evidence
references = operon.find_all(string=re.compile('PubMed ID'))
refs = []
for ref in references:
pubmed = ref.find_next_sibling('a')
pubmed_url = pubmed.get('href')
pubmed_id = str(pubmed.text)
pubmed_id = re.compile('[\t\n\s]').sub('',pubmed_id)
lookup = PubMedLookup(pubmed_id, '')
citation = Publication(lookup).cite()
refs.append(dict(pubmed_url=pubmed_url, citation=citation))
operon_dict['References'] = pandas.DataFrame(refs)
cross_references = str(operon.find(string=re.compile("Cross-References")).find_next('div').find_next('div').text)
cross_references=re.compile("[\t\n\s]").sub("", cross_references)
operon_dict['Cross-References'] = cross_references
operons_dict[name] = operon_dict
# Loop over headings and get table as pandas.pandas.DataFrame.
panels['Operons'] = operons_dict
def _get_transposon_insertions(self, url, panels):
""" Parse the 'transposons' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
# Get transposons tab.
transposons_url = url + "&view=transposons"
browser = BeautifulSoup(guarded_get(transposons_url), 'html.parser')
table_heading = "Transposon Insertions"
# Navigate to heading.
headings = browser.find_all('h3', string=re.compile(table_heading))
transposon_dict = dict()
for h in headings:
parent = h.parent
try:
td = pandas.read_html(str(parent))
except ValueError:
td = [pandas.DataFrame()]
logging.warning("No table found, will return empty DataFrame.")
key = h.get_text()
key = re.compile("[\n\t]").sub("", key)
transposon_dict[key] = td
panels[table_heading] = transposon_dict
def _get_updates(self, url, panels):
""" Parse the 'Updates' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
# Get updates tab.
updates_url = url + "&view=updates"
browser = BeautifulSoup(guarded_get(updates_url), 'html.parser')
heading = browser.find('h3', string=re.compile('Annotation Updates'))
annotation_table = pandas.read_html(str(heading.parent))
panels['Annotation Updates'] = annotation_table
def _get_orthologs(self, url, panels):
""" Parse the 'Orthologs' tab and extract the tables.
:param url: The base URL of the feature.
:type url: str
:param panels: The datastructure into which the tables are stored.
:type panel: dict
"""
###
# Orthologs are different in that they are queried from the pdc
# orthologs database. We construct the corresponding URL and pull
# the tab file directly.
# TODO: or should we pull the fasta?
# Get the pseudomonas.com id for this feature.
pdc_id = url.split('id=')[1]
# Construct the URL for the orthologs DB.
orthologs_url = '/'.join([self.__pdc_url, 'orthologs', 'list?format=tab&extension=tab&id={}'.format(pdc_id)])
# GET html. Bail out if none.
try:
request = get(orthologs_url)
# Buffer the data.
with StringIO(request.text) as stream:
df = pandas.read_csv(stream, sep='\t')
stream.close()
except:
logging.warning("No orthologs found. Will return empty DataFrame.")
df = pandas.DataFrame()
panels["Orthologs"] = df
[docs] def to_json(self, results, outfile=None):
""" Serialize results dictionary to json.
:param results: The results dictionary (dict of pandas.DataFrame).
:type results: dict
:param outfile: Path to file for writing query results to. Default: None, will write to temp file.
:type outfile: str
:raises IOError: 'outfile' not writable.
:return: If successful, path to written file.
"""
if outfile is None:
file_path = tempfile.mkstemp(prefix="pseudomonas_dot_com_query_", suffix=".json")[1]
else:
file_path = outfile
# Call the workhorse.
_serialize(file_path, results)
return file_path
[docs] def from_json(self, infile):
""" Deserialize a json file into a results dictionary (a dict of pandas.DataFrame).
:param infile: The file path of the json file to load.
:type infile: str
"""
return _deserialize(infile)
def _serialize(path, obj):
""" """
""" Serialize the passed dictionary (obj) to path. """
with open(path, 'w') as fp:
json.dump(obj, fp, cls=JSONEncoder)
def _deserialize(path):
""" """
""" Deserialize a json file (located at 'path') into a dictionary. Reconstruct pandas.DataFrames from loaded content. """
with open(path, 'r') as fp:
loaded = json.load(fp)
ret = {}
for query, result in loaded.items():
ret[query] = {}
for key, value in result.items():
ret[query][key] = pandas.read_json(value)
return ret
def _dict_to_pdc_query(**kwargs):
""" """
"""
Convert a dictionary of query key-value pairs to a pdc_query instance.
:param kwargs: Dictionary of query key-value pairs.
:type kwargs: dict
"""
query = pdc_query(kwargs['strain'], kwargs['feature'], kwargs['organism'])
return query
def _pandasDF_from_heading(soup, table_heading, index_column=0):
""" """
""" Find the table that belongs to the passed heading in a formatted html tree (the soup).
:param soup: The html tree to parse.
:type soup: BeautifulSoup
:param table_heading: The table heading to find.
:type table_heading: str
:param index_column: Which column to use as the pandas.DataFrame's index.
:type index_column: int
:return: The table under the passed heading as a pandas.pandas.DataFrame
:rtype: pandas.pandas.DataFrame
"""
# Get table html string.
table_ht = str(soup.find('h3', string=re.compile(table_heading)).find_next())
try:
df = pandas.read_html(table_ht, index_col=index_column)[0]
except:
logging.warning("No data found for %s. Will return empty pandas.DataFrame.", table_heading)
df = pandas.DataFrame()
return df
def _pandas_references(soup):
""" Extract references from given html soup and return them as pandas pandas.DataFrame. """
# Setup container to store parsed information.
raw = []
# Get the References "table".
ref_soup = soup.find("h3", string=re.compile('^References'))
# Get all <a> tags.
a_tags = ref_soup.find_next().find_all('a')
# Loop over all <a> tags
for i,a in enumerate(a_tags):
# Get the link text.
pubmed_link=a.get('href')
citation = Publication(PubMedLookup(pubmed_link, '')).cite()
raw.append(dict(pubmed_url=pubmed_link, citation=citation))
# Return as pandas.DataFrame.
return pandas.DataFrame(raw)
def _get_doi_from_ncbi(pubmed_link):
""" Extract the DOI from a pubmed link. """
if (pubmed_link != ''):
doi_soup = BeautifulSoup(guarded_get(pubmed_link), 'lxml')
line = doi_soup.find(string=re.compile("DOI")).find_parent().find_parent()
a = line.find('a', string=re.compile('10\.[0-9]*\/'))
doi_string = a.text
doi = re.sub("[\t,\n,\s]","",doi_string)
return doi
def _get_bib_from_doi(doi):
""" Get bibliographic information from a given doi."""
# Get bib data.
success, json = crossref.get_json(doi)
if success and json['status'].lower() == 'ok':
message = json['message']
entry = {'doi' :doi,
'first_author' :"{0:s}, {1:s}".format(message['author'][0]['family'],
message['author'][0]['given']),
'title' :message['title'][0],
'container':message['container-title'][0],
'volume' :message['volume'],
'page' :message['page'],
'date' :"{0:d}-{1:02d}-{2:02d}".format(*(message['published-print']['date-parts'][0])),
}
return entry
def _run_from_cli(args):
""" Called if run via command line interface.
:param args: Command line arguments.
:type args: argparse.ArgumentsObject
"""
# Construct the query.
query = pdc_query(args.strain, args.feature, args.organism)
# Construct the Scraper.
scraper = PseudomonasDotComScraper(query)
try:
scraper.connect()
except:
logging.error("Could not connect to pseudomonas.com .")
return 0
# Run the query and serialize.
try:
results = scraper.run_query()
except:
logging.error("Query failed.")
return 0
try:
path = scraper.to_json(results, args.outfile)
except:
logging.error("Could not write results to disk.")
raise
return 0
# Message.
logging.info("Query was successfull. Results stored in %s.", path)
del scraper
return 1
def _cleanup_str(chars, string):
""" Replace all characters in chars in string by "". """
patterns = []
for c in chars:
patterns.append(re.compile(c))
space = re.compile(" *")
tab = re.compile("\t*")
if __name__ == "__main__":
from argparse import ArgumentParser
# Setup argument parser.
parser = ArgumentParser()
parser.add_argument("-o",
"--outfile",
dest="outfile",
default=None,
required=False,
help="Where to write the query results.",
)
parser.add_argument("-f",
"--feature",
dest="feature",
default=None,
required=True,
help="The gene/feature to query from pseudomonas.com.")
org_group = parser.add_mutually_exclusive_group(required=True)
org_group.add_argument("-s",
"--strain",
dest="strain",
default=None,
help="The strain to query from pseudomonas.com. Mutually exclusive with parameter -o/--organism option.")
org_group.add_argument("-O",
"--organism",
dest="organism",
default=None,
help="The organism to query from pseudomonas.com. Mutually exclusive with parameter 'strain'.")
# Parse arguments.
args = parser.parse_args()
_run_from_cli(args)