- process_files(files, sjclient, logger)
- retrieve_urls(urls, chunk_size=200)
- @retry
- @memoize
- chunker(seq, size)
- @hang
- parse_sj_date(dt, date_format=”iso”, relative_to=None)
- archive_file( fp, filename, meta=None, temp=False, async_mode=True)
- parse_document(fp, doc_type)
- send_email(to, subject, body_text, cc=None, attachments=None)
- @callable(job=False, as_admin=False)
- account_config
- get_proxy()
- get_max_dates_for_series_query(query)
- get_max_dates_for_series_ids(sids)
- lock(key='', owner_id=None)
- create_lookup_obj(xtralookup_obj=None)
- Expressions specific functions
- @expressionable(apply_operators=False)
- xpr_fields(sid, fields)
- clear_xpr_fields()
- xpr_add_query_for_facets(query)
- xpr_query(sid, query)
- sjs(series_query, df=None, dt=None, max_points=0)
- sjsordf(query, max_series=None, key_field='series_id', df=None, dt=None, max_points=0, fields=None, sort=None, err_if_total_higher=True)
- sjdf(query, max_series=None, key_field='series_id', df=None, dt=None, max_points=0, fields=None, sort=None, err_if_total_higher=True)
- SJVirt specific functions
- invoke_sjvirt_source_func(func_name, vrt_obj, *args, **kwargs)
- Alerts specific functions
- raise_series_alert(series_id, rule_id, issue_id=None, at=None, alert_details=None)
- unset_series_issue(series_id, rule_id, issue_id, at=None)
- Subpages
process_files(files, sjclient, logger)
Processes a list of files from the processor settings. Yields a tuple of path and open file object.
Example of settings:
{
"files": [
// If the file was uploaded via an uploader,
// it is ensured to contain:
{
"file_id": "d149ce3f-0e04-41a3-b749-573c0c8ab76d",
"name": "Metadata_Master_20210208.xlsx"
}
// But different configurations are possible:
{"url": "https://example.com/file"},
{"path", "/tmp/file"},
]
}
Parameters:
- files (default: from
settings['files']
): list of dicts with the file information - sjclient (default:
sjclient
initialized in processor): Shooju connection client - logger (default:
logger
initialized in processor): logger available to processors
retrieve_urls(urls, chunk_size=200)
Retrieves multiple URLs in parallel.
Parameters:
- urls (required) - list of urls to retrieve
- chunk_size (default: 200) - number of parallel urls to retrieve
Yields tuples of (url, dict) where dict is in the form:
{
"text": "encoded text of page",
"content": "raw content",
"headers": {...},
"status_code": 200,
"encoding": "encoding",
"url": "url in headers"
}
@retry
A decorator. Retries the decorated function using an exponential backoff. This decorator can be called without keyword arguments. Useful for wrapping steps in an importer that sometimes break because of source issues, like FTP too-many-users errors or unreliable APIs.
Parameters
- times (default: 5) - number of times to try before giving up
- delay (default: 3) - initial delay between retries in seconds
- backoff (default: 2) - backoff multiplier e.g. value of 2 will double the delay
- exception_to_check (default: Exception) - the exception to check (Exception or tuple of Exceptions), other exceptions will be ignored and re-raised immediately without retry.
Example:
@sjutils.retry(times=10)
def something_dangerous():
...
@memoize
A decorator. Caches function calls based on all *args and **kwargs. Useful for wrapping calculation-intensive or I/O intensive operations that yield the same results (e.g. requests to a remote lookup).
Example:
@sjutils.memoize
def somethinghard(param1, param2, ...):
...some work...
chunker(seq, size)
Splits a sequence seq into a chunks of size size
@hang
A decorator. Setups periodic task for limited time. Return value is ignored. If function execution takes more than run_every corresponding schedule will be skipped.
Parameters:
- hang_for (required) - hang time in seconds
- run_every (required) - task run period in seconds
Example:
@sjutils.hang(hang_for=60*60*3, run_every=5*60)
def poll_updates_from_source():
....
def process():
poll_updates_from_source()
logger.info("done") # will be done in 3 hours
In the above example poll_updates_from_source()
function will be called every 5 minutes, during 3 hours.
parse_sj_date(dt, date_format=”iso”, relative_to=None)
Function parses a date with the given format and return unix epoch milliseconds.
Parameters:
- dt (required) - date to be parsed (can be a string, float, int or None)
- date_format - date format of the date to be parsed (iso, unix, datetime)
- relative_to - start point for relative dates. If not passed then relative delta is relative to utcnow().
Examples:
sjutils.parse_sj_date('-1d')
sjutils.parse_sj_date(datetime(2001, 1, 1))
sjutils.parse_sj_date('2001-01-01')
sjutils.parse_sj_date('-1Mb', relative_to=datetime(2001, 1, 1))
archive_file( fp, filename, meta=None, temp=False, async_mode=True)
Uploads file content to Shooju using the POST /files API. Updates metadata if provided. Returns None in async mode, True/False upload status in sync mode. Useful for archiving source content that might not be available in the future. When async_mode=False, returns unique file_id
of the file.
More discussion (SJ Internal docs): https://shooju.atlassian.net/wiki/pages/createpage.action?spaceKey=INT&title=Archived%20files .
Parameters:
- fp (required) - file-like object
- filename (required) - filename; does not have to be unique
- meta metadata dictionary
- temp (default: False) - indicates that file will be automatically removed in a certain period of time
- async_mode (default: True) - performs upload asynchronously
parse_document(fp, doc_type)
Extracts text data from doc, docx, pdf, ppt, pptx files.
Parameters:
- fp (required) - file-like object
- doc_type (required) - file doc type represented by its extension (doc, docx...)
send_email(to, subject, body_text, cc=None, attachments=None)
Sends an email message to a given recipient. Raises an error if failed to send a message due to any cause.
Parameters:
- to (required) - A list of recipient(s) email address(es)
- subject (required) - Email message subject
- body_text (required) - Email message body text
- cc - Optional list of email message "carbon copy" recipients
- attachments - Optional array of attachments. Must be array of (<name>, <file-like object>) tuples.
@callable(job=False, as_admin=False)
Functions that are wrapped by this decorator can be called via /<processor>/call/<func>
api.
Parameters:
- job (default False) - Creates a new job, so that datastore object is able to write data on function call.
- as_admin (default False) - If true, then
sjclient
anddatastore
will make API calls with admin permissions, rather than permissions of the caller user.
Example:
@sjutils.callable(job=True)
def foo():
...
account_config
A property. Returns account specific settings. e.g. aggs_default_e, localize_default_field
get_proxy()
Returns a random proxy config. The proxy config can be passed as a requests
proxies
parameter as is.
The random proxy returned at first may still fail, so using this function together with a sjutils.retry
decorator may be useful.
get_max_dates_for_series_query(query)
Returns series max written points dates by given series query.
Parameters:
- query (required) - Series query
get_max_dates_for_series_ids(sids)
Returns series max written points dates by given series ids array.
- sids (required) - Array of series ids
lock(key='', owner_id=None)
Context manager that acquires an account global lock. Using this lock makes sense when it’s needed to synchronise different independent processes.
Parameters:
- key - the lock name. This can be considered as an id of object that should be locked (e.g series id, name of some object that should be controlled exclusively etc)
- owner_id - is an id of object that acquires the lock. It’s impossible to lock the same
key
by a few different owners. By defaultowner_id
is id of current processor.
Examples:
# the code of processor xyz that atomically updates some series
with sjutils.lock("some_object", "xyz"):
# do some manupilation
# processor abc
with sjutils.lock("some_object", "abc"): # this will block while xyz doing its job
# do some manupilation
The function is a factory that returns redis_lock.Lock()
instance. For advance usage refer to the lib documentation.
create_lookup_obj(xtralookup_obj=None)
Creates a dictionary from xtra lookups.
Parameters:
- xtralookup_obj - xtralookup_obj param from the processor dictionary. Can pass another processor’s xtralookup_obj like:
sjutils.create_lookup_obj(sjclient.raw.get('/processors/xxx')['importer']['xtralookup_obj'])
Or within a processor:
sjutils.create_lookup_obj(processor['xtralookup_obj'])
Expressions specific functions
@expressionable(apply_operators=False)
A decorator. Any custom expression function must be wrapped by this decorator. Otherwise expression engine will not allow to use that function.
The apply_operators
parameter makes @pre
operators (check “Operators Order” here) be applied as if they were @post
operators. This is necessary when a function receives a query string instead of a series and you want to make operators work with it:
=PROCESSOR.function_without_apply_operators('query')@lag:5y # @lag will never be applied
=PROCESSOR.function_without_apply_operators('query')@post@lag:5y # @lag will be applied
=PROCESSOR.function_with_apply_operators('query')@lag:5y # @lag will be applied
xpr_fields(sid, fields)
Notifies expression engine that this series was used in expression, to make the engine able to correctly generate expression fields. Note that this will not add fields for saved xpr series.
Parameter:
- sid (required) - Series id
- fields (required) - Series fields.
clear_xpr_fields()
Clears all collected xpr fields data
xpr_add_query_for_facets(query)
Notifies expression engine about queries used in expression, to make it able to correctly generate facets.
The query must not contain operators.
xpr_query(sid, query)
Notifies expression engine that this series was used in expression, to make the engine able to correctly generate expression sub-query (used mostly for /cart feature)
Parameter:
- sid (required) - Series id
- query (required) - Series query.
sjs(series_query, df=None, dt=None, max_points=0)
Returns a pandas.Series based on a series query.
Parameters:
- series_query (required) - Series query
- df - Datetime to start the DataFrame; can be MIN, MAX, or datetime()
- dt - Datetime to end the DataFrame; can be MIN, MAX, or datetime()
- max_points - Maximum number of points to retrieve; use 0 to not retrieve points
- include_job - Indicates that point job_id should be included in /series api response
sjsordf(query, max_series=None, key_field='series_id', df=None, dt=None, max_points=0, fields=None, sort=None, err_if_total_higher=True)
Returns a pandas.DataFrame or pandas.Series (if is a single series in result) based on a Shooju query. This is an actual function called when {{series query}} used in expression.
Parameters:
- series_query (required) - Series query
- max_series (default 100) - Maximum number of series to return as column in the dataframe
- key_field (default series_id) - Field to use as the column header in the returned DataFrame
- df - Datetime to start the DataFrame; can be MIN, MAX, or datetime()
- dt - Datetime to end the DataFrame; can be MIN, MAX, or datetime()
- max_points - Maximum number of points to retrieve; use 0 to not retrieve points
- fields - A list of fields to pull from series
- include_job - Indicates that point job_id should be included in /series api response
- sort - Sort order to pass to api /series?query= call
- err_if_total_higher (default True) - Raises an error if number of series returned by query is higher than max_series
sjdf(query, max_series=None, key_field='series_id', df=None, dt=None, max_points=0, fields=None, sort=None, err_if_total_higher=True)
Returns a pandas.DataFrame based on a Shooju query.
Parameters:
- series_query (required) - Series query
- max_series (default 100) - Maximum number of series to return as column in the dataframe
- key_field (default series_id) - Field to use as the column header in the returned DataFrame
- df - Datetime to start the DataFrame; can be MIN, MAX, or datetime()
- dt - Datetime to end the DataFrame; can be MIN, MAX, or datetime()
- max_points - Maximum number of points to retrieve; use 0 to not retrieve points
- fields - A list of fields to pull from series
- include_job - Indicates that point job_id should be included in /series api response
- sort - Sort order to pass to api /series?query= call
- err_if_total_higher (default True) - Raises an error if number of series returned by query is higher than max_series
SJVirt specific functions
invoke_sjvirt_source_func(func_name, vrt_obj, *args, **kwargs)
Invokes custom SJVirt source function.
Parameters:
- func_name (required) - Source function name
- vrt_obj (required) - Series `vrt` field value
- args - Function positional arguments
- kwargs - Function keywords arguments
Alerts specific functions
raise_series_alert(series_id, rule_id, issue_id=None, at=None, alert_details=None)
Sets alert on series. When alert is raise, all alert subscribers will be notified.
Parameters:
- series_id (required) - Series id that triggered an alert
- rule_id (required) - Rule id that was violated. All its subscribers will be notified.
- issue_id - Issue id to set on series (meta.issue)
- at - Event timestamp. Can be milli or datetime() object
- alert_details - Alert details mapping. Contains subject, details and show_only_details keys
Example:
points = sjclient.get_series(f"sid={sid}", max_points=1, df='MAX', dt='MIN')['points']
last_point = points[0]
if (datetime.utcnow() - last_point.datetime).total_seconds() > THRESHOLD:
sjutils.raise_series_alert(sid, rule_id, issue_id, alert_details={
'details': 'No updates since {}'.format(last_point.datetime.isoformat())
})
unset_series_issue(series_id, rule_id, issue_id, at=None)
Unsets alert issue flag from series (meta.issue). This means issue is resolved and alert is no longer triggering.
Parameters:
- series_id (required) - Series id that triggered an alert
- rule_id (required) - Rule id that was violated.
- issue_id - Issue id to set on series (meta.issue)
- at - Event timestamp. Can be milli or datetime() object