Source code for astroquery.linelists.cdms.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import requests
import os

from bs4 import BeautifulSoup
import astropy.units as u
from astropy import table
from astropy.io import ascii
from astroquery.query import BaseQuery
# import configurable items declared in __init__.py
from astroquery.linelists.cdms import conf
from astroquery.exceptions import InvalidQueryError, EmptyResponseError
from astroquery.linelists.core import parse_letternumber, parse_molid
from astroquery.utils import process_asyncs
from astroquery import log

import re

__all__ = ['CDMS', 'CDMSClass']


def data_path(filename):
    data_dir = os.path.join(os.path.dirname(__file__), 'data')
    return os.path.join(data_dir, filename)


[docs] class CDMSClass(BaseQuery): # use the Configuration Items imported from __init__.py URL = conf.search SERVER = conf.server CLASSIC_URL = conf.classic_server TIMEOUT = conf.timeout MALFORMATTED_MOLECULE_LIST = ['017506 NH3-wHFS', '028528 H2NC', '058501 H2C2S', '064527 HC3HCN'] def __init__(self, *, fallback_to_getmolecule=False): """ CDMS line list query class Parameters ---------- fallback_to_getmolecule : bool, optional If True, when a molecule is requested that results in a malformatted or unparseable response, ``get_molecule`` will be attempted automatically to retrieve the full catalog for that molecule. In this case, no frequency-based selection will be applied. """ super().__init__() def _mol_to_payload(self, molecule, *, parse_name_locally=False, flags=0): if parse_name_locally: self.lookup_ids = build_lookup() luts = self.lookup_ids.find(molecule, flags) if len(luts) == 0: raise InvalidQueryError('No matching species found. Please ' 'refine your search or read the Docs ' 'for pointers on how to search.') return tuple(f"{val:06d} {key}" for key, val in luts.items())[0] else: return molecule
[docs] def query_lines(self, min_frequency, max_frequency, *, min_strength=-500, molecule='All', temperature_for_intensity=300, flags=0, parse_name_locally=False, get_query_payload=False, fallback_to_getmolecule=False, verbose=False, cache=True): # Check if a malformatted molecule was requested and use fallback if enabled # accounts for three formats, e.g.: '058501' or 'H2C2S' or '058501 H2C2S' badlist = (self.MALFORMATTED_MOLECULE_LIST + [y for x in self.MALFORMATTED_MOLECULE_LIST for y in x.split()]) # extract molecule from the response or request requested_molecule = self._mol_to_payload(molecule, parse_name_locally=parse_name_locally, flags=flags) if molecule != 'All' else None if requested_molecule and requested_molecule in badlist and not get_query_payload: if fallback_to_getmolecule: try: return self.get_molecule(requested_molecule[:6]) except ValueError as ex: # try to give the users good guidance on which parameters will work if "molecule_id should be an integer or a length-6 string of numbers" in str(ex): if parse_name_locally: raise ValueError(f"Molecule {molecule} could not be parsed or identified." " Check that the name was correctly specified.") else: raise ValueError(f"Molecule {molecule} needs to be formatted as" " a 6-digit string ID for the get_molecule fallback to work." " Try setting parse_name_locally=True " "to turn your molecule name into a CDMS number ID.") else: raise ex else: raise ValueError(f"Molecule {requested_molecule} is known not to comply with standard CDMS format. " f"Try get_molecule({requested_molecule}) instead or set " f"CDMS.fallback_to_getmolecule = True.") else: response = self.query_lines_async(min_frequency=min_frequency, max_frequency=max_frequency, min_strength=min_strength, molecule=molecule, temperature_for_intensity=temperature_for_intensity, flags=flags, parse_name_locally=parse_name_locally, get_query_payload=get_query_payload, cache=cache) if get_query_payload: return response else: return self._parse_result(response, molname=molecule, verbose=verbose)
[docs] def query_lines_async(self, min_frequency, max_frequency, *, min_strength=-500, molecule='All', temperature_for_intensity=300, flags=0, parse_name_locally=False, get_query_payload=False, cache=True): """ Creates an HTTP POST request based on the desired parameters and returns a response. Parameters ---------- min_frequency : `astropy.units.Quantity` or None Minimum frequency (or any spectral() equivalent). ``None`` can be interpreted as zero. max_frequency : `astropy.units.Quantity` or None Maximum frequency (or any spectral() equivalent). ``None`` can be interpreted as infinite. min_strength : int, optional Minimum strength in catalog units, the default is -500 molecule : list or string if parse_name_locally=False, string of regex if parse_name_locally=True, optional Identifiers of the molecules to search for. If this parameter is not provided the search will match any species. Default is 'All'. As a first pass, the molecule will be searched for with a direct string match. If no string match is found, a regular expression match is attempted. Note that if the molecule name regex contains parentheses, they must be escaped. For example, 'H2C(CN)2.*' must be specified as 'H2C\\(CN\\)2.*' (but because of the first-attempt full-string matching, 'H2C(CN)2' will match that molecule successfully). temperature_for_intensity : float The temperature to use when computing the intensity Smu^2. Set to 300 by default for compatibility with JPL and the native catalog format, which defaults to 300. ** If temperature is set to zero, the return value in this column will be the Einstein A value ** flags : int, optional Regular expression flags. Default is set to 0 parse_name_locally : bool, optional When set to True it allows the method to parse through catdir.cat (see `get_species_table`) in order to match the regex inputted in the molecule parameter and request the corresponding tags of the matches instead. Default is set to False get_query_payload : bool, optional When set to `True` the method should return the HTTP request parameters as a dict. Default value is set to False cache : bool Defaults to True. If set overrides global caching behavior. See :ref:`caching documentation <astroquery_cache>`. Returns ------- response : `requests.Response` The HTTP response returned from the service. Examples -------- >>> table = CDMS.query_lines(min_frequency=100*u.GHz, ... max_frequency=110*u.GHz, ... min_strength=-500, ... molecule="018505 H2O+") # doctest: +REMOTE_DATA >>> print(table) # doctest: +SKIP FREQ ERR LGINT DR ELO GUP TAG QNFMT Ju Ku vu Jl Kl vl F name MHz MHz MHz nm2 1 / cm ----------- ----- ------- --- -------- --- ----- ----- --- --- --- --- --- --- ----------- ---- 103614.4941 2.237 -4.1826 3 202.8941 8 18505 2356 4 1 4 4 0 4 3 2 1 3 0 3 H2O+ 107814.8763 148.6 -5.4438 3 878.1191 12 18505 2356 6 5 1 7 1 6 7 4 4 8 1 7 H2O+ 107822.3481 148.6 -5.3846 3 878.1178 14 18505 2356 6 5 1 7 1 7 7 4 4 8 1 8 H2O+ 107830.1216 148.6 -5.3256 3 878.1164 16 18505 2356 6 5 1 7 1 8 7 4 4 8 1 9 H2O+ """ # first initialize the dictionary of HTTP request parameters payload = dict() if min_frequency is not None and max_frequency is not None: # allow setting payload without having *ANY* valid frequencies set min_frequency = min_frequency.to(u.GHz, u.spectral()) max_frequency = max_frequency.to(u.GHz, u.spectral()) if min_frequency > max_frequency: raise InvalidQueryError("min_frequency must be less than max_frequency") payload['MinNu'] = min_frequency.value payload['MaxNu'] = max_frequency.value payload['UnitNu'] = 'GHz' payload['StrLim'] = min_strength payload['temp'] = temperature_for_intensity payload['logscale'] = 'yes' payload['mol_sort_query'] = 'tag' payload['sort'] = 'frequency' payload['output'] = 'text' payload['but_action'] = 'Submit' # changes interpretation of query self._last_query_temperature = temperature_for_intensity if molecule == 'All': payload['Moleculesgrp'] = 'all species' else: if molecule is not None: payload['Molecules'] = self._mol_to_payload(molecule, parse_name_locally=parse_name_locally, flags=flags) if get_query_payload: return payload # BaseQuery classes come with a _request method that includes a # built-in caching system response = self._request(method='POST', url=self.URL, data=payload, timeout=self.TIMEOUT, cache=cache) response.raise_for_status() if 'Ups! Code: 011' in response.text: raise InvalidQueryError("Specified query was invalid. Check that" " a valid molecule name was specified. " f"Payload was {payload}.") soup = BeautifulSoup(response.text, 'html.parser') ok = False urls = [x.attrs['src'] for x in soup.find_all('frame',)] for url in urls: if 'tab' in url and 'head' not in url: ok = True break if not ok: raise EmptyResponseError("Did not find table in response") baseurl = self.URL.split('cgi-bin')[0] fullurl = f'{baseurl}/{url}' response2 = self._request(method='GET', url=fullurl, timeout=self.TIMEOUT, cache=cache) return response2
query_lines.__doc__ = process_asyncs.async_to_sync_docstr(query_lines_async.__doc__) def _parse_result(self, response, *, verbose=False, molname=None): """ Parse a response into an `~astropy.table.Table` The catalog data files are composed of fixed-width card images, with one card image per spectral line. The format of each card image is similar to the JPL version: FREQ, ERR, LGINT, DR, ELO, GUP, TAG, QNFMT, QN', QN" (F13.4,F8.4, F8.4, I2,F10.4, I3, I7, I4, 6I2, 6I2) but the formats are somewhat different and are encoded below. The first several entries are the same, but more detail is appended at the end of the line FREQ: Frequency of the line in MHz. ERR: Estimated or experimental error of FREQ in MHz. LGINT: Base 10 logarithm of the integrated intensity in units of nm^2 MHz at 300 K. DR: Degrees of freedom in the rotational partition function (0 for atoms, 2 for linear molecules, and 3 for nonlinear molecules). ELO: Lower state energy in cm^{-1} relative to the ground state. GUP: Upper state degeneracy. MOLWT: The first half of the molecular weight tag, which is the mass in atomic mass units (Daltons). TAG: Species tag or molecular identifier. This only includes the last 3 digits of the CDMS tag A negative value of MOLWT flags that the line frequency has been measured in the laboratory. We record this boolean in the 'Lab' column. ERR is the reported experimental error. QNFMT: Identifies the format of the quantum numbers Ju/Ku/vu and Jl/Kl/vl are the upper/lower QNs F: the hyperfine lines name: molecule name The full detailed description is here: https://cdms.astro.uni-koeln.de/classic/predictions/description.html#description """ if 'Zero lines were found' in response.text: raise EmptyResponseError(f"Response was empty; message was '{response.text}'.") soup = BeautifulSoup(response.text, 'html.parser') text = soup.find('pre').text # this is a different workaround to try to make _some_ of the bad molecules parseable # (it doesn't solve all of them, which is why the above fallback exists) need_to_filter_bad_molecules = False for bad_molecule in self.MALFORMATTED_MOLECULE_LIST: if text.find(bad_molecule.split()[1]) > -1: need_to_filter_bad_molecules = True break if need_to_filter_bad_molecules: text_new = '' text = text.split('\n') for line in text: need_to_include_line = True for bad_molecule in self.MALFORMATTED_MOLECULE_LIST: if line.find(bad_molecule.split()[1]) > -1: need_to_include_line = False break if need_to_include_line: text_new = text_new + '\n' + line text = text_new starts = {'FREQ': 0, 'ERR': 14, 'LGINT': 25, 'DR': 36, 'ELO': 38, 'GUP': 47, 'TAG': 50, 'QNFMT': 57, 'Ju': 61, 'Ku': 63, 'vu': 65, 'F1u': 67, 'F2u': 69, 'F3u': 71, 'Jl': 73, 'Kl': 75, 'vl': 77, 'F1l': 79, 'F2l': 81, 'F3l': 83, 'name': 89} try: result = ascii.read(text, header_start=None, data_start=0, comment=r'THIS|^\s{12,14}\d{4,6}.*', names=list(starts.keys()), col_starts=list(starts.values()), format='fixed_width', fast_reader=False) result['FREQ'].unit = u.MHz result['ERR'].unit = u.MHz result['MOLWT'] = [int(x/1e3) for x in result['TAG']] result['Lab'] = result['MOLWT'] < 0 result['MOLWT'] = np.abs(result['MOLWT']) result['MOLWT'].unit = u.Da fix_keys = ['GUP'] for suf in 'ul': for qn in ('J', 'v', 'K', 'F1', 'F2', 'F3'): qnind = qn+suf fix_keys.append(qnind) for key in fix_keys: if not np.issubdtype(result[key].dtype, np.integer): intcol = np.array(list(map(parse_letternumber, result[key])), dtype=int) result[key] = intcol # if there is a crash at this step, something went wrong with the query # and the _last_query_temperature was not set. This shouldn't ever # happen, but, well, I anticipate it will. if self._last_query_temperature == 0: result.rename_column('LGINT', 'LGAIJ') result['LGAIJ'].unit = u.s**-1 else: result['LGINT'].unit = u.nm**2 * u.MHz result['ELO'].unit = u.cm**(-1) except ValueError as ex: # Give users a more helpful exception when parsing fails new_message = ("Failed to parse CDMS response. This may be caused by a malformed search return. " f"You can check this by running `CDMS.get_molecule('{molname}')` instead; if it works, the " "problem is caused by the CDMS search interface and cannot be worked around.") raise ValueError(new_message) from ex return result
[docs] def get_species_table(self, *, catfile='partfunc.cat', use_cached=True, catfile_url=conf.catfile_url, catfile2='catdir.cat', catfile_url2=conf.catfile_url2, write_new_species_cache=False): """ A directory of the catalog is found in a file called 'catdir.cat.' The table is derived from https://cdms.astro.uni-koeln.de/classic/entries/partition_function.html Parameters ---------- catfile : str, name of file, default 'catdir.cat' The catalog file, installed locally along with the package use_cached : bool, optional If True, use the cached file if it exists. If False, download the file from the CDMS server and save it to the cache if ``write_new_species_cache`` is set. write_new_species_cache : bool, optional *Overrides ``use_cached=True``*. If True, write the species data table files to the CDMS data directory. Use this option if you need to update the index from CDMS; this should be set to False for testing. Returns ------- Table: `~astropy.table.Table` | tag : The species tag or molecular identifier. | molecule : An ASCII name for the species. | #line : The number of lines in the catalog. | lg(Q(n)) : A seven-element vector containing the base 10 logarithm of the partition function. """ if use_cached and not write_new_species_cache: try: result = ascii.read(data_path(catfile), format='fixed_width', delimiter='|') result2 = ascii.read(data_path(catfile2), format='fixed_width', delimiter='|') except UnicodeDecodeError: with open(data_path(catfile), 'rb') as fh: content = fh.read() text = content.decode('ascii', errors='replace') result = ascii.read(text, format='basic', delimiter='|') with open(data_path(catfile2), 'rb') as fh: content = fh.read() text = content.decode('ascii', errors='replace') result2 = ascii.read(text, format='basic', delimiter='|') else: result = retrieve_catfile(catfile_url) result2 = retrieve_catfile2(catfile_url2) if write_new_species_cache: result.write(data_path(catfile), format='ascii.fixed_width', delimiter='|', overwrite=True) result2.write(data_path(catfile2), format='ascii.fixed_width', delimiter='|', overwrite=True) merged = table.join(result, result2, keys=['tag']) if not all(merged['#lines'] == merged['# lines']): raise ValueError("Inconsistent table of molecules from CDMS.") del merged['# lines'] # reorder columns result = merged[['tag', 'molecule', 'Name', '#lines', 'lg(Q(1000))', 'lg(Q(500))', 'lg(Q(300))', 'lg(Q(225))', 'lg(Q(150))', 'lg(Q(75))', 'lg(Q(37.5))', 'lg(Q(18.75))', 'lg(Q(9.375))', 'lg(Q(5.000))', 'lg(Q(2.725))', 'Ver.', 'Documentation', 'Date of entry', 'Entry']] meta = {'lg(Q(1000))': 1000.0, 'lg(Q(500))': 500.0, 'lg(Q(300))': 300.0, 'lg(Q(225))': 225.0, 'lg(Q(150))': 150.0, 'lg(Q(75))': 75.0, 'lg(Q(37.5))': 37.5, 'lg(Q(18.75))': 18.75, 'lg(Q(9.375))': 9.375, 'lg(Q(5.000))': 5.0, 'lg(Q(2.725))': 2.725} def tryfloat(x): try: return float(x) except ValueError: return np.nan for key in meta: result[key].meta = {'Temperature (K)': meta[key]} result[key] = np.array([tryfloat(val) for val in result[key]]) result.meta = {'Temperature (K)': [1000., 500., 300., 225., 150., 75., 37.5, 18.75, 9.375, 5., 2.725]} result.add_index('tag') return result
[docs] def get_molecule(self, molecule_id, *, cache=True, return_response=False): """ Retrieve the whole molecule table for a given molecule id Parameters ---------- molecule_id : int or str The molecule tag/identifier. Can be an integer (e.g., 18003 for H2O) or a zero-padded 6-character string (e.g., '018003'). cache : bool Defaults to True. If set overrides global caching behavior. See :ref:`caching documentation <astroquery_cache>`. return_response : bool, optional If True, return the raw `requests.Response` object instead of parsing the response. If this is set, the response will be returned whether or not it was successful. Default is False. """ molecule_id = parse_molid(molecule_id) url = f'{self.CLASSIC_URL}/entries/c{molecule_id}.cat' response = self._request(method='GET', url=url, timeout=self.TIMEOUT, cache=cache) if return_response: return response response.raise_for_status() if 'Zero lines were found' in response.text: raise EmptyResponseError(f"Response was empty; message was '{response.text}'.") result = self._parse_cat(response.text) species_table = self.get_species_table() result.meta = dict(species_table.loc[int(molecule_id)]) return result
def _parse_cat(self, text, *, verbose=False): """ Parse a CDMS-format catalog file into an `~astropy.table.Table`. The catalog data files are composed of 80-character card images. Format: [F13.4, 2F8.4, I2, F10.4, I3, I7, I4, 12I2]: FREQ, ERR, LGINT, DR, ELO, GUP, TAG, QNFMT, QN Parameters ---------- text : str The catalog file text content. verbose : bool, optional Not used currently. Returns ------- Table : `~astropy.table.Table` Parsed catalog data. """ # Column start positions starts = {'FREQ': 0, 'ERR': 14, 'LGINT': 22, 'DR': 30, 'ELO': 32, 'GUP': 42, 'TAG': 44, 'QNFMT': 51, 'Q1': 55, 'Q2': 57, 'Q3': 59, 'Q4': 61, 'Q5': 63, 'Q6': 65, 'Q7': 67, 'Q8': 69, 'Q9': 71, 'Q10': 73, 'Q11': 75, 'Q12': 77, 'Q13': 79, 'Q14': 81, } result = ascii.read(text, header_start=None, data_start=0, comment=r'THIS|^\s{12,14}\d{4,6}.*', names=list(starts.keys()), col_starts=list(starts.values()), format='fixed_width', fast_reader=False) # Ensure TAG is integer type for computation # int truncates - which is what we want result['TAG'] = result['TAG'].astype(int) result['MOLWT'] = [int(x/1e3) for x in result['TAG']] result['FREQ'].unit = u.MHz result['ERR'].unit = u.MHz result['Lab'] = result['MOLWT'] < 0 result['MOLWT'] = np.abs(result['MOLWT']) result['MOLWT'].unit = u.Da fix_keys = ['GUP'] for qn in (f'Q{ii}' for ii in range(1, 15)): fix_keys.append(qn) log.debug(f"fix_keys: {fix_keys} should include Q1, Q2, ..., Q14 and GUP") for key in fix_keys: if not np.issubdtype(result[key].dtype, np.integer): intcol = np.array(list(map(parse_letternumber, result[key])), dtype=int) if any(intcol == -999999): intcol = np.ma.masked_where(intcol == -999999, intcol) result[key] = intcol if not np.issubdtype(result[key].dtype, np.integer): raise ValueError(f"Failed to parse {key} as integer") result['LGINT'].unit = u.nm**2 * u.MHz result['ELO'].unit = u.cm**(-1) return result
CDMS = CDMSClass() class Lookuptable(dict): def find(self, st, flags): """ Search dictionary keys for a regex match to string s Parameters ---------- s : str String to compile as a regular expression Can be entered non-specific for broader results ('H2O' yields 'H2O' but will also yield 'HCCCH2OD') or as the specific desired regular expression for catered results, for example: ('H2O$' yields only 'H2O') flags : int Regular expression flags. Returns ------- The dictionary containing only values whose keys match the regex """ if st in self: return {st: self[st]} out = {} for kk, vv in self.items(): # note that the string-match attempt here differs from the jplspec # implementation match = (st in kk) or re.search(st, str(kk), flags=flags) if match: out[kk] = vv return out def build_lookup(): result = CDMS.get_species_table() # start with the 'molecule' column keys = list(result['molecule'][:]) # convert NAME column to list values = list(result['tag'][:]) # convert TAG column to list dictionary = dict(zip(keys, values)) # make k,v dictionary # repeat with the Name column keys = list(result['Name'][:]) values = list(result['tag'][:]) dictionary2 = dict(zip(keys, values)) dictionary.update(dictionary2) lookuptable = Lookuptable(dictionary) # apply the class above return lookuptable def retrieve_catfile(url=f'{conf.classic_server}/entries/partition_function.html'): """ Simple retrieve index function """ response = requests.get(url) response.raise_for_status() lines = response.text.split("\n") # used to convert '---' to nan def tryfloat(x): try: return float(x) except ValueError: return np.nan # the 'fixed width' table reader fails because there are rows that violate fixed width tbl_rows = [] for row in lines[15:-5]: split = row.split() tag = int(split[0]) molecule_and_lines = row[7:41] molecule = " ".join(molecule_and_lines.split()[:-1]) nlines = int(molecule_and_lines.split()[-1]) partfunc = map(tryfloat, row[41:].split()) partfunc_dict = dict(zip(['lg(Q(1000))', 'lg(Q(500))', 'lg(Q(300))', 'lg(Q(225))', 'lg(Q(150))', 'lg(Q(75))', 'lg(Q(37.5))', 'lg(Q(18.75))', 'lg(Q(9.375))', 'lg(Q(5.000))', 'lg(Q(2.725))'], partfunc)) tbl_rows.append({'tag': tag, 'molecule': molecule, '#lines': nlines, }) tbl_rows[-1].update(partfunc_dict) tbl = table.Table(tbl_rows) # tbl = ascii.read(response.text, header_start=None, data_start=15, data_end=-5, # names=['tag', 'molecule', '#lines', 'lg(Q(1000))', 'lg(Q(500))', 'lg(Q(300))', 'lg(Q(225))', # 'lg(Q(150))', 'lg(Q(75))', 'lg(Q(37.5))', 'lg(Q(18.75))', 'lg(Q(9.375))', 'lg(Q(5.000))', # 'lg(Q(2.725))'], # col_starts=(0, 7, 34, 41, 53, 66, 79, 92, 106, 117, 131, 145, 159, 173), # format='fixed_width', delimiter=' ') return tbl def retrieve_catfile2(url=f'{conf.classic_server}/predictions/catalog/catdir.html'): """ Simple retrieve index function """ response = requests.get(url) response.raise_for_status() try: tbl = ascii.read(response.text, format='html') except UnicodeDecodeError: # based on https://github.com/astropy/astropy/issues/3826#issuecomment-256113937 # which suggests to start with the bytecode content and decode with 'replace errors' text = response.content.decode('ascii', errors='replace') tbl = ascii.read(text, format='html') # delete a junk column (wastes space) del tbl['Catalog'] # for joining - want same capitalization tbl.rename_column("Tag", "tag") # one of these is a unicode dash, the other is a normal dash.... in theory if 'Entry in cm–1' in tbl.colnames: tbl.rename_column('Entry in cm–1', 'Entry') if 'Entry in cm-1' in tbl.colnames: tbl.rename_column('Entry in cm-1', 'Entry') return tbl