Source code for dgitcore.datasets.common

#!/usr/bin/env python
"""
This is the core module for manipulating the dataset metadata
"""
import os, sys, copy, fnmatch, re, shutil
import yaml, json, tempfile, mimetypes
import webbrowser, traceback, collections
import subprocess, string, random, pipes
from collections import OrderedDict
import shelve, getpass
from datetime import datetime
from hashlib import sha256
import mimetypes
import platform
import uuid, shutil
import boto3, glob2
import subprocess
from dateutil import parser
try:
    from urllib.parse import urlparse
except:
    from urlparse import urlparse
from ..config import get_config
from ..plugins.common import plugins_get_mgr
from ..helper import bcolors, clean_str, cd, compute_sha256, run, clean_name
from ..exceptions import *
from .detect import get_schema
from .history import get_history, get_diffs
from .validation import validate

#####################################################
# Exports
#####################################################

__all__ = [
    'lookup',
    'list_repos',
    'shellcmd',
    'log', 'show', 'push', 'pull', 'commit',
    'stash', 'drop', 'status', 'post',
    'clone', 'init', 'diff',
    'remote'
]

#####################################################
# Repo independent commands...
#####################################################
[docs]def lookup(username, reponame): """ Lookup a repo based on username reponame """ mgr = plugins_get_mgr() # XXX This should be generalized to all repo managers. repomgr = mgr.get(what='repomanager', name='git') repo = repomgr.lookup(username=username, reponame=reponame) return repo
[docs]def list_repos(remote=False): """ List repos Parameters ---------- remote: Flag """ mgr = plugins_get_mgr() if not remote: repomgr = mgr.get(what='repomanager', name='git') repos = repomgr.get_repo_list() repos.sort() return repos else: raise Exception("Not supported yet")
##################################################### # Repo specific generic commands #####################################################
[docs]def shellcmd(repo, args): """ Run a shell command within the repo's context Parameters ---------- repo: Repository object args: Shell command """ with cd(repo.rootdir): result = run(args) return result
def datapackage_exists(repo): """ Check if the datapackage exists... """ datapath = os.path.join(repo.rootdir, "datapackage.json") return os.path.exists(datapath) ##################################################### # Repo specific simple commands ##################################################### def generic_repo_cmd(repo, cmd, *args): # print("Running generic command", cmd, args) return repo.run(cmd, *args)
[docs]def log(repo, args=[]): """ Log of the changes executed until now Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'log', args)
[docs]def show(repo, args=[]): """ Show commit details Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'show', args)
[docs]def remote(repo, args=[]): """ Show remote Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'remote', args)
[docs]def push(repo, args=[]): """ Push changes to the backend Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'push', args)
[docs]def pull(repo, args=[]): """ Pull changes from the backend Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'pull', args)
[docs]def commit(repo, args=[]): """ Commit changes to the data repository Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'commit', args)
[docs]def drop(repo, args=[]): """ Drop the repository (new to dgit) Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'drop', args)
[docs]def stash(repo, args=[]): """ Stash the changes Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'stash', args)
[docs]def diff(repo, args=[]): """ Diff between versions Parameters ---------- repo: Repository object args: Arguments to git command """ return generic_repo_cmd(repo, 'diff', args)
[docs]def status(repo, args=[]): """ Show status of the repo Parameters ---------- repo: Repository object (result of lookup) details: Show internal details of the repo args: Parameters to be passed to git status command """ result = generic_repo_cmd(repo, 'status', args) return result
def delete(repo, args=[]): """ Delete files Parameters ---------- repo: Repository object args: Arguments to git command """ message = """Delete is not yet implemented completely. datapackage.json should be updated to keep in sync with files on disk.""" raise NotImplemented() # Cleanup the repo generic_repo_cmd(repo, 'delete', args) # Have to sync up repo files and datapackage.json # XXX MISSING # Now sync the metadata (handle, filename) = tempfile.mkstemp() with open(filename, 'w') as fd: fd.write(json.dumps(repo.package, indent=4)) # Update the file.. repo.run('add_files', [ { 'relativepath': 'datapackage.json', 'localfullpath': filename, } ]) ##################################################### # Initialize a repo ##################################################### def bootstrap_datapackage(repo, force=False, options=None, noinput=False): """ Create the datapackage file.. """ print("Bootstrapping datapackage") # get the directory tsprefix = datetime.now().date().isoformat() # Initial data package json package = OrderedDict([ ('title', ''), ('description', ''), ('username', repo.username), ('reponame', repo.reponame), ('name', str(repo)), ('title', ""), ('description', ""), ('keywords', []), ('resources', []), ('creator', getpass.getuser()), ('createdat', datetime.now().isoformat()), ('remote-url', repo.remoteurl) ]) if options is not None: package['title'] = options['title'] package['description'] = options['description'] else: if noinput: raise IncompleteParameters("Option field with title and description") for var in ['title', 'description']: value = '' while value in ['',None]: value = input('Your Repo ' + var.title() + ": ") if len(value) == 0: print("{} cannot be empty. Please re-enter.".format(var.title())) package[var] = value # Now store the package... (handle, filename) = tempfile.mkstemp() with open(filename, 'w') as fd: fd.write(json.dumps(package, indent=4)) repo.package = package return filename
[docs]def init(username, reponame, setup, force=False, options=None, noinput=False): """ Initialize an empty repository with datapackage.json Parameters ---------- username: Name of the user reponame: Name of the repo setup: Specify the 'configuration' (git only, git+s3 backend etc) force: Force creation of the files options: Dictionary with content of dgit.json, if available. noinput: Automatic operation with no human interaction """ mgr = plugins_get_mgr() repomgr = mgr.get(what='repomanager', name='git') backendmgr = None if setup == 'git+s3': backendmgr = mgr.get(what='backend', name='s3') repo = repomgr.init(username, reponame, force, backendmgr) # Now bootstrap the datapackage.json metadata file and copy it in... # Insert a gitignore with .dgit directory in the repo. This # directory will be used to store partial results (handle, gitignore) = tempfile.mkstemp() with open(gitignore, 'w') as fd: fd.write(".dgit") # Try to bootstrap. If you cant, cleanup and return try: filename = bootstrap_datapackage(repo, force, options, noinput) except Exception as e: repomgr.drop(repo,[]) os.unlink(gitignore) raise e repo.run('add_files', [ { 'relativepath': 'datapackage.json', 'localfullpath': filename, }, { 'relativepath': '.gitignore', 'localfullpath': gitignore, }, ]) # Cleanup temp files os.unlink(filename) os.unlink(gitignore) args = ['-a', '-m', 'Bootstrapped the repo'] repo.run('commit', args) return repo
[docs]def clone(url): """ Clone a URL. Examples include: - git@github.com:pingali/dgit.git - https://github.com:pingali/dgit.git - s3://mybucket/git/pingali/dgit.git Parameters ---------- url: URL of the repo """ backend = None backendmgr = None if url.startswith('s3'): backendtype = 's3' elif url.startswith("http") or url.startswith("git"): backendtype = 'git' else: backendtype = None mgr = plugins_get_mgr() repomgr = mgr.get(what='repomanager', name='git') backendmgr = mgr.get(what='backend', name=backendtype) # print("Testing {} with backend {}".format(url, backendmgr)) if backendmgr is not None and not backendmgr.url_is_valid(url): raise InvalidParameters("Invalid URL") key = repomgr.clone(url, backendmgr) # Insert a datapackage if it doesnt already exist... repo = repomgr.lookup(key=key) if not datapackage_exists(repo): filename = bootstrap_datapackage(repo) repo.run('add_files', [ { 'relativepath': 'datapackage.json', 'localfullpath': filename, }, ]) os.unlink(filename) args = ['-a', '-m', 'Bootstrapped the repo'] repo.run('commit', args) return repo
########################################################### # Post metadata to a server ########################################################### def annotate_metadata_data(repo, task, patterns, size=0): """ Update metadata with the content of the files """ matching_files = repo.find_matching_files(patterns) package = repo.package rootdir = repo.rootdir files = package['resources'] for f in files: relativepath = f['relativepath'] if relativepath in matching_files: path = os.path.join(rootdir, relativepath) if task == 'preview': print("Adding preview for ", relativepath) f['content'] = open(path).read()[:size] elif task == 'schema': print("Adding schema for ", path) f['schema'] = get_schema(path) def annotate_metadata_code(repo, files): """ Update metadata with the commit information """ package = repo.package package['code'] = [] for p in files: matching_files = glob2.glob("**/{}".format(p)) for f in matching_files: absf = os.path.abspath(f) print("Add commit data for {}".format(f)) package['code'].append(OrderedDict([ ('script', f), ('permalink', repo.manager.permalink(repo, absf)), ('mimetypes', mimetypes.guess_type(absf)[0]), ('sha256', compute_sha256(absf)) ])) def annotate_metadata_platform(repo): """ Update metadata host information """ print("Added platform information") package = repo.package mgr = plugins_get_mgr() repomgr = mgr.get(what='instrumentation', name='platform') package['platform'] = repomgr.get_metadata() def annotate_metadata_diffs(repo): print("Computing diffs") with cd(repo.rootdir): get_diffs(repo.package['history']) def annotate_metadata_validation(repo): print("Adding validation information") # Collect the validation results by relativepath results = validate(repo) fileresults = {} for r in results: filename = r['target'] if filename not in fileresults: fileresults[filename] = [] fileresults[filename].append(r) for r in repo.package['resources']: path = r['relativepath'] if path in fileresults: r['validation'] = fileresults[path] def annotate_metadata_dependencies(repo): """ Collect information from the dependent repo's """ options = repo.options if 'dependencies' not in options: print("No dependencies") return [] repos = [] dependent_repos = options['dependencies'] for d in dependent_repos: if "/" not in d: print("Invalid dependency specification") (username, reponame) = d.split("/") try: repos.append(repo.manager.lookup(username, reponame)) except: print("Repository does not exist. Please create one", d) package = repo.package package['dependencies'] = [] for r in repos: package['dependencies'].append({ 'username': r.username, 'reponame': r.reponame, })
[docs]def post(repo, args=[]): """ Post to metadata server Parameters ---------- repo: Repository object (result of lookup) """ mgr = plugins_get_mgr() keys = mgr.search(what='metadata') keys = keys['metadata'] if len(keys) == 0: return # Incorporate pipeline information... if 'pipeline' in repo.options: for name, details in repo.options['pipeline'].items(): patterns = details['files'] matching_files = repo.find_matching_files(patterns) matching_files.sort() details['files'] = matching_files for i, f in enumerate(matching_files): r = repo.get_resource(f) if 'pipeline' not in r: r['pipeline'] = [] r['pipeline'].append(name + " [Step {}]".format(i)) if 'metadata-management' in repo.options: print("Collecting all the required metadata to post") metadata = repo.options['metadata-management'] # Add data repo history if 'include-data-history' in metadata and metadata['include-data-history']: repo.package['history'] = get_history(repo.rootdir) # Add data repo history if 'include-preview' in metadata: annotate_metadata_data(repo, task='preview', patterns=metadata['include-preview']['files'], size=metadata['include-preview']['length']) if 'include-schema' in metadata: annotate_metadata_data(repo, task='schema', patterns=metadata['include-schema']) if 'include-code-history' in metadata: annotate_metadata_code(repo, files=metadata['include-code-history']) if 'include-platform' in metadata: annotate_metadata_platform(repo) if 'include-validation' in metadata: annotate_metadata_validation(repo) if 'include-dependencies' in metadata: annotate_metadata_dependencies(repo) history = repo.package.get('history',None) if (('include-tab-diffs' in metadata) and metadata['include-tab-diffs'] and history is not None): annotate_metadata_diffs(repo) # Insert options as well repo.package['config'] = repo.options try: for k in keys: # print("Key", k) metadatamgr = mgr.get_by_key('metadata', k) url = metadatamgr.url o = urlparse(url) print("Posting to ", o.netloc) response = metadatamgr.post(repo) if isinstance(response, str): print("Error while posting:", response) elif response.status_code in [400]: content = response.json() print("Error while posting:") for k in content: print(" ", k,"- ", ",".join(content[k])) except NetworkError as e: print("Unable to reach metadata server!") except NetworkInvalidConfiguration as e: print("Invalid network configuration in the INI file") print(e.message) except Exception as e: print("Could not post. Unknown error") print(e)