"""
This module performs requests with `splus.cloud` server over https
Features
--------
* Download a single rgb image colored using Lupton or Trilogy method
* Donwload a single fits image of single band
* Fast batch download of rgb images using multiprocessing
* Fast batch download of fits images using multiprocessing
* Single async database query
* Fast batch database queries using multiprocessing
Authors
-------
Natanael Magalhães Cardoso <natanael.net>
"""
import concurrent.futures
from datetime import datetime, timedelta
from enum import Enum
from multiprocessing import Lock
from pathlib import Path
from time import sleep
from types import FunctionType
from typing import Callable, List, Union
from urllib.parse import quote, urljoin, urlparse
import requests
import tqdm
from mergernet.core.constants import SPLUS_PASS, SPLUS_USER
from mergernet.services.utils import download_file
BASE_URL = 'https://splus.cloud/api/'
LOGIN_ROUTE = 'auth/login'
LUPTON_ROUTE = 'get_lupton_image/{ra}/{dec}/{size}/{r_band}/{g_band}/{b_band}/{stretch}/{Q}'
TRILOGY_ROUTE = 'get_image/{ra}/{dec}/{size}/{r_band}-{g_band}-{b_band}/{noise}/{saturation}'
FITS_ROUTE = 'get_cut/{ra}/{dec}/{size}/{band}'
PUBLIC_TAP_ROUTE = '/public-TAP/tap/async/?request=doQuery&version=1.0&lang=ADQL&phase=run&query={sql}&format={fmt}'
PRIVATE_TAP_ROUTE = '/tap/tap/async/?request=doQuery&version=1.0&lang=ADQL&phase=run&query={sql}&format={fmt}'
[docs]def update_authorization(f: Callable):
"""
Decorator that can be placed on functions that needs S-PLUS authentication
This decorator will update authorization token before the function call if needed
Parameters
----------
f: the function that will be decorated
Returns
-------
Callable
The decorated function
"""
def wrapper(*kargs, **kwargs):
this: SplusService = kargs[0]
updated = this.update_token()
if updated:
this.client.headers.update({
'Authorization': f'Token {this.token["value"]}'
})
return f(*kargs, **kwargs)
return wrapper
[docs]class ImageType(Enum):
"""
Enum that represents the S-PLUS image types
"""
trilogy = 'trilogy'
lupton = 'lupton'
fits = 'fits'
[docs]class SplusService:
"""
This service class interacts with splus.cloud server over https
Parameters
----------
username: str (optional)
The username used in splus.cloud authentication, defaults to SPLUS_USER
environment variable
password: str (optional)
The password used in splus.cloud authentication, defaults to SPLUS_PASS
environment variable
"""
_lock: Lock = Lock()
def __init__(self, username: str = SPLUS_USER, password: str = SPLUS_PASS):
self.credentials = {
'username': username,
'password': password
}
self.token: dict = None
self.token_duration = timedelta(hours=1)
self.client = requests.Session()
[docs] def update_token(self) -> bool:
"""
Updates splus.cloud authorization token
Returns
-------
bool
`True` if the token was updated and `False` otherwise
"""
now = datetime.now()
if self.token is None or self.token['timestamp'] < (now - self.token_duration):
with SplusService._lock:
if self.token is None or self.token['timestamp'] < (now - self.token_duration):
resp = self.client.post(
self._get_url(LOGIN_ROUTE),
json=self.credentials
)
if resp.status_code == 200:
if 'application/json' in resp.headers['Content-Type']:
resp_body = resp.json()
if 'token' in resp_body:
self.token = {
'value': resp_body['token'],
'timestamp': datetime.now()
}
return True # updated
return False # using cache
[docs] def download_lupton_rgb(
self,
ra: float,
dec: float,
save_path: Union[str, Path],
replace: bool = False,
size: int = 128,
r_band: str = 'I',
g_band: str = 'R',
b_band: str = 'G',
stretch: Union[int, float] = 3,
Q: Union[int, float] = 8
):
"""
Downloads a single Lupton RGB image based on object posistion, this method
accepts arguments to customize Lupton's parameters
Parameters
----------
ra: float
The right ascension in degrees
dec: float
The declination in degrees
save_path: str or Path
The path where the file will be saved
replace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download
size: int (optional)
The image size in pixels
r_band: str (optional)
The S-PLUS band that will be mapped as R channel of the RGB image
g_band: str (optional)
The S-PLUS band that will be mapped as G channel of the RGB image
b_band: str (optional)
The S-PLUS band that will be mapped as B channel of the RGB image
stretch: int or float (optional)
The `stretch` parameter of Lupton's formula
Q: int or float (optional)
The `Q` parameter of Lupton's formula
"""
self._download_image(
LUPTON_ROUTE,
save_path=save_path,
replace=replace,
ra=str(ra),
dec=str(dec),
size=str(size),
r_band=str(r_band),
g_band=str(g_band),
b_band=str(b_band),
stretch=str(stretch),
Q=str(Q)
)
[docs] def download_trilogy_rgb(
self,
ra: float,
dec: float,
save_path: Union[str, Path],
replace: bool = False,
size: int = 128,
r_band: List[str] = ['R', 'I', 'F861', 'Z'],
g_band: List[str] = ['G', 'F515', 'F660'],
b_band: List[str] = ['U', 'F378', 'F395', 'F410', 'F430'],
noise: float = 0.15,
saturation: float = 0.15
):
"""
Downloads a single Trilogy RGB image based on object posistion, this method
accepts arguments to customize Trilogy parameters
Parameters
----------
ra: float
The right ascension in degrees
dec: float
The declination in degrees
save_path: str or Path
The path where the file will be saved
replace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download
size: int (optional)
The image size in pixels
r_band: str (optional)
The S-PLUS band that will be mapped as R channel of the RGB image
g_band: str (optional)
The S-PLUS band that will be mapped as G channel of the RGB image
b_band: str (optional)
The S-PLUS band that will be mapped as B channel of the RGB image
noise: int or float (optional)
The `noise` parameter of Trilogy algorithm
saturation: int or float (optional)
The `saturation` parameter of Trilogy algorithm
"""
self._download_image(
TRILOGY_ROUTE,
ra=ra,
dec=dec,
save_path=save_path,
replace=replace,
size=size,
r_band=','.join(r_band),
g_band=','.join(g_band),
b_band=','.join(b_band),
noise=noise,
saturation=saturation
)
[docs] def download_fits(
self,
ra: float,
dec: float,
save_path: Union[str, Path],
replace: bool = False,
size: int = 128,
band: str = 'R'
):
"""
Downloads a single FITS image based on object posistion
Parameters
----------
ra: float
The right ascension in degrees
dec: float
The declination in degrees
save_path: str or Path
The path where the file will be saved
replace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download
size: int (optional)
The image size in pixels
band: str (optional)
The S-PLUS band of the fits file
"""
self._download_image(
FITS_ROUTE,
ra=ra,
dec=dec,
save_path=save_path,
replace=replace,
size=size,
band=band
)
[docs] def batch_image_download(
self,
ra: List[float],
dec: List[float],
save_path: List[Union[str, Path]],
img_type: ImageType = ImageType.lupton,
workers: int = None,
**kwargs
):
"""
Downloads a single Lupton RGB image based on object posistion, this method
accepts arguments to customize Lupton's parameters
Parameters
----------
ra: list of float
A list of right ascension in degrees
dec: list of float
A list of declination in degrees
save_path: list of str or list of Path
A list of paths where the file will be saved
img_type: ImageType (optional)
Can be `ImageType.lupton`, `ImageType.trilogy` or `ImageType.fits`.
Defaults to `ImageType.lupton`
workers: int (optional)
The number of parallel processes that will be spawned. Default to a
single process
kwargs: Any (optional)
Any optional parameter of `download_lupton_rgb` if `img_type` is
`ImageType.lupton`, or any optional parameter of `download_trilogy_rgb`
if `img_type` is `ImageType.trilogy` or any optional parameter of
`download_fits_rgb` if `img_type` is `ImageType.fits`.
These parameters must have the same type as the mentioned functions,
i.e. pass a single value instead of a list of values and all images
will be downloaded with the same parameter
Examples
--------
>> ra = [172.4, 193.9, 63.3]
>> dec = [0.42, 2.63, -1.24]
>> paths = ['fig1.png', 'fig2.png', 'fig3.png']
>> service.batch_image_download(ra, dec, paths, ImageType.trilogy,
... size=256, noise=0.2, saturation=0.2)
"""
assert len(ra) == len(dec) == len(save_path)
if img_type == ImageType.fits:
download_function = self.download_fits
elif img_type == ImageType.lupton:
download_function = self.download_lupton_rgb
elif img_type == ImageType.trilogy:
download_function = self.download_trilogy_rgb
download_args = [
{'ra': _ra, 'dec': _dec, 'save_path': Path(_save_path), **kwargs}
for _ra, _dec, _save_path in zip(ra, dec, save_path)
]
self._batch_download(
download_function=download_function,
download_args=download_args,
workers=workers
)
[docs] @update_authorization
def query(
self,
sql: str,
save_path: Union[str, Path],
replace: bool = False,
scope: str = 'public',
fmt: str = 'text/csv'
):
"""
Sends a single query to splus.cloud database
Parameters
----------
sql: str
The sql query string
save_path: str or Path
The path where the query output will be saved
repalace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download. Default to `False`
scope: str (optional)
The splus.cloud scope. Can be `public` or `private`. Use `private` only
if you are assigned as collaborator. Defaults to `public`
fmt: str (optional)
The mimi-type of query output. Defaults to `text/csv`
Examples
--------
>> service.query('SELECT TOP 10 * FROM dr1.all_dr1', 'query.csv')
"""
if scope == 'public':
url = self._get_url(PUBLIC_TAP_ROUTE, {'sql': quote(sql), 'fmt': fmt})
else:
url = self._get_url(PRIVATE_TAP_ROUTE, {'sql': quote(sql), 'fmt': fmt})
resp = self.client.post(
url,
headers={
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
)
if resp.status_code == 200:
self._track_tap_job(url=resp.url, save_path=save_path, replace=replace)
[docs] def batch_query(
self,
sql: List[str],
save_path: List[Union[str, Path]],
replace: bool = False,
scope: str = 'public',
fmt: str = 'text/csv',
workers: int = None
):
"""
Sends a batch of queries to splus.cloud database
Parameters
----------
sql: list of str
The sql query string
save_path: list of str or list of Path
The path where the query output will be saved
repalace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download. Default to `False`
scope: str (optional)
The splus.cloud scope. Can be `public` or `private`. Use `private` only
if you are assigned as collaborator. Defaults to `public`
fmt: str (optional)
The mimi-type of query output. Defaults to `text/csv`
workers: int (optional)
The number of parallel processes that will be spawned. Defaults to 1
"""
assert len(sql) == len(save_path)
args = [
{
'sql': _sql,
'save_path': _save_path,
'replace': replace,
'scope': scope,
'fmt': fmt
}
for _sql, _save_path in zip(sql, save_path)
]
self._batch_download(
download_function=self.query,
download_args=args,
workers=workers
)
[docs] def _get_url(self, route: str, params: dict = {}) -> str:
"""
Get the full url based on params
"""
return urljoin(BASE_URL, route.format(**params))
[docs] @update_authorization
def _track_tap_job(self, url: str, save_path: Union[str, Path], replace: bool):
"""
Tracks the async query status in splus.cloud database
Parameters
----------
url: str
The job url
save_path: str or Path
The path where the query output will be saved
repalace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download. Default to `False`
"""
while True:
resp = self.client.get(url, headers={'Accept': 'application/json'})
if resp.status_code == 200:
data = resp.json()
destruction_time = datetime.fromisoformat(data['destruction'][:-1] + '+00:00')
now = datetime.now(destruction_time.tzinfo)
if data['phase'] == 'EXECUTING' and destruction_time > now:
sleep(5)
elif data['phase'] == 'COMPLETED':
result_url = urlparse(data['results'][0]['href'])
result_url = result_url._replace(netloc='splus.cloud').geturl()
download_file(
url=result_url,
save_path=save_path,
replace=replace,
http_client=self.client
)
break
elif data['phase'] == 'ERROR':
message = data['error'].get('message', '')
print(message)
break
else:
break
else:
print(f'Status code: {resp.status_code}')
break
[docs] @update_authorization
def _download_image(
self,
route: str,
save_path: Union[str, Path],
replace: bool = False,
**kwargs
):
"""
Generic download method for splus.cloud images
This method performs a generic two stages download, required by splus.cloud
Parameters
----------
route: str
The image request route
save_path: str or Path
The path where the file will be saved
replace: bool (optional)
This method checks if a file exists in `save_path` location before the
download. If this parameters is `True` and a file exists in `save_path`
location, this method will ovewrite the existing file. If this parameter
is `False` and a file exists in `sava_path` location, this method will
skip the download. Default to `False`
"""
if not replace and save_path.exists():
return
# Stage 1 request
url = self._get_url(route, kwargs)
resp = self.client.get(url)
if resp.status_code == 200:
if 'application/json' in resp.headers['Content-Type']:
resp_body = resp.json()
file_url = self._get_url(resp_body['filename'])
# Stage 2 request
download_file(
file_url,
save_path=save_path,
replace=replace,
http_client=self.client
)
[docs] def _batch_download(
self,
download_function: FunctionType,
download_args: List[dict],
workers: int = None
):
"""
Generic batch download method.
This method receives a donwload function and perform a multi-theread
execution
Parameters
----------
download_function: function
The download function that will spawned in multiple threads
download_args: list of dict
The list of parameters of `download_function`
workers: int (optional)
The number of parallel processes that will be spawned. Defaults to 1
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
download_function = download_function or download_file
for i in range(len(download_args)):
futures.append(executor.submit(
download_function,
**download_args[i]
))
for future in tqdm.tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
unit=' files'
):
try:
future.result()
except Exception:
pass
if __name__ == '__main__':
s = SplusService()
ra = [11.5933851, 11.8345742, 11.9053378, 12.1397573, 12.3036425]
dec = [-1.0180862, -0.8710110, -0.8707459, -0.7373196, -0.4088959]
path1 = ['1.png', '2.png', '3.png', '4.png', '5.png']
path2 = ['01.png', '02.png', '03.png', '04.png', '05.png']
path3 = ['1.fits', '2.fits', '3.fits', '4.fits', '5.fits']
# s.batch_image_download(ra, dec, path1, img_type=ImageType.lupton, workers=3, replace=True)
# s.batch_image_download(ra, dec, path2, img_type=ImageType.trilogy, workers=3, replace=True)
# s.batch_image_download(ra, dec, path3, img_type=ImageType.fits, workers=3, replace=True)
sql = ['SELECT TOP 100 ID, RA, DEC FROM dr3.all_dr3 where id like \'%HYDRA%\'']
# path4 = [f'table{i}.csv' for i in range(10)]
path4 = ['table0.csv']
s.batch_query(sql, save_path=path4, replace=True, workers=2)