root/src/vfs/extfs/helpers/torrent.in

/* [previous][next][first][last][top][bottom][index][help]  */
#! @PYTHON@
"""Torrent Virtual FileSystem for Midnight Commander
 
The script requires Midnight Commander 3.1+
(http://www.midnight-commander.org/), Python 2.7+ (http://www.python.org/),
module eff_bdecode.py (from effbot.org/zone/bencode.htm) is now included
in the code.
 
For mc 4.7+ just put the script in $HOME/[.local/share/].mc/extfs.d.
For older versions put it in /usr/[local/][lib|share]/mc/extfs
and add a line "torrent" to the /usr/[local/][lib|share]/mc/extfs/extfs.ini.
Make the script executable.
 
For mc 4.7+ run this "cd" command in the Midnight Commander (in the "bindings"
file the command is "%cd"): cd file/torrent://; In older versions it is
cd file#torrent, where "file" is the name of your torrent metafile.
 
See detailed installation instructions at
https://phdru.name/Software/mc/torrent_INSTALL.html.
 
The VFS lists all files and directories from the torrent metafile; all files
appear empty, of course, but the sizes are shown. Filenames are reencoded from
the metafile's encoding/codepage to the current locale.
 
Along with the files/directories in the torrent metafile the VFS also presents
meta information - in the form of files in .META directory. The size and
contents of these files are taken from the corresponding fields in the torrent
metafile. The script doesn't check if the torrent consists of a .META file or
directory (quite unlikely).
 
Date/time for all directories/files is set to the value of 'creation date'
field, if it exists; if not date/time is set to the last modification time of
the torrent file itself.
 
The filesystem is, naturally, read-only.
 
"""
 
__version__ = "1.4.1"
__author__ = "Oleg Broytman <phd@phdru.name>"
__copyright__ = "Copyright (C) 2010-2025 PhiloSoft Design"
__license__ = "GPL"
 
 
from datetime import datetime
from os.path import dirname, getmtime
import os
import subprocess
import sys
from time import localtime, asctime
 
if 'MC_TEST_EXTFS_LIST_CMD' in os.environ:
    from time import gmtime as localtime
else:
    from time import localtime
 
try:
    import locale
    use_locale = True
except ImportError:
    use_locale = False
 
if use_locale:
    # Get the default charset.
    try:
        if sys.version_info[:2] < (3, 11):
            lcAll = locale.getdefaultlocale()
        else:
            lcAll = []
    except locale.Error as err:
        #print("WARNING:", err, file=sys.stderr)
        lcAll = []
 
    if len(lcAll) == 2:
        default_encoding = lcAll[1]
    else:
        try:
            default_encoding = locale.getpreferredencoding()
        except locale.Error as err:
            #print("WARNING:", err, file=sys.stderr)
            default_encoding = sys.getdefaultencoding()
else:
    default_encoding = sys.getdefaultencoding()
 
if default_encoding is None:
    default_encoding = "utf-8"
 
import logging
logger = logging.getLogger('torrent-mcextfs')
log_err_handler = logging.StreamHandler(sys.stderr)
logger.addHandler(log_err_handler)
logger.setLevel(logging.INFO)
 
if len(sys.argv) < 3:
    logger.critical("""\
Torrent Virtual FileSystem for Midnight Commander version %s
Author: %s
%s
 
This is not a program. Put the script in $HOME/[.local/share/].mc/extfs.d or
/usr/[local/][lib|share]/mc/extfs. For more information read the source!""",
                    __version__, __author__, __copyright__)
    sys.exit(1)
 
locale.setlocale(locale.LC_ALL, '')
 
PY3 = (sys.version_info[0] >= 3)
if PY3:
    def output(s):
        sys.stdout.buffer.write(s.encode(default_encoding, 'replace') + b'\n')
else:
    def output(s):
        sys.stdout.write(s + '\n')
 
# ---------- eff_bdecode.py ----------
# effbot.org/zone/bencode.htm
#
# Copyright (C) 1995-2013 by Fredrik Lundh
#
# By obtaining, using, and/or copying this software and/or its associated
# documentation, you agree that you have read, understood, and will comply with
# the following terms and conditions:
#
# Permission to use, copy, modify, and distribute this software and its
# associated documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appears in all copies, and that both
# that copyright notice and this permission notice appear in supporting
# documentation, and that the name of Secret Labs AB or the author not be used
# in advertising or publicity pertaining to distribution of the software
# without specific, written prior permission.
#
# SECRET LABS AB AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS
# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN
# NO EVENT SHALL SECRET LABS AB OR THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
 
from functools import partial
import re
 
 
def tokenize(text, match=re.compile(b"([idel])|(\\d+):|(-?\\d+)").match):
    i = 0
    while i < len(text):
        m = match(text, i)
        s = m.group(m.lastindex)
        i = m.end()
        if m.lastindex == 2:
            yield "s"
            yield text[i:i+int(s)]
            i = i + int(s)
        else:
            yield s.decode('ascii')
 
 
def decode_item(next_, token):
    if token == "i":
        # integer: "i" value "e"
        data = int(next_())
        if next_() != "e":
            raise ValueError
    elif token == "s":
        # string: "s" value (virtual tokens)
        data = next_()
    elif token == "l" or token == "d":
        # container: "l" (or "d") values "e"
        data = []
        tok = next_()
        while tok != "e":
            data.append(decode_item(next_, tok))
            try:
                tok = next_()
            except StopIteration:
                break
        if token == "d":
            data = dict(zip(data[0::2], data[1::2]))
    else:
        raise ValueError
    return data
 
 
def eff_bdecode(text):
    try:
        src = tokenize(text)
        data = decode_item(partial(next, src), next(src))
        for token in src:  # look for more tokens
            raise SyntaxError("trailing junk")
    except (AttributeError, ValueError, StopIteration):
        raise SyntaxError("syntax error")
    return data
# ---------- /eff_bdecode.py ----------
 
 
def mctorrent_list():
    """List the entire VFS"""
 
    info = torrent['info']
    if 'name' not in info and 'name.utf-8' not in info:
        torrent_error('Unknown name')
 
    codepage = torrent.get('codepage', None)
    encoding = torrent.get('encoding', None)
    if not encoding and codepage:
        encoding = str(codepage)
 
    name = info['name']
    name_utf8 = info.get('name.utf-8', None)
 
    dt = None
    if 'files' in info:
        files = info['files']
        paths = []
        for file in files:
            if 'path' not in file and 'path.utf-8' not in file:
                torrent_error('Unknown path')
            if 'length' not in file:
                torrent_error('Unknown length')
            if 'path.utf-8' in file:
                if name_utf8:
                    path = '/'.join([name_utf8] + file['path.utf-8'])
                else:
                    path = '/'.join([name] + file['path.utf-8'])
            else:
                if name_utf8:
                    path = '/'.join([name_utf8] + path)
                else:
                    path = '/'.join([name] + file['path'])
            length = file['length']
            paths.append((path, length))
    else:  # One-file torrent
        if 'length' not in info:
            torrent_error('Unknown length')
        length = info['length']
        if name_utf8:
            name = name_utf8
        paths = [(name, length)]
 
    meta = []
    for name in 'announce', 'announce-list', 'codepage', 'comment', \
                'created by', 'creation date', 'encoding', \
                'nodes', 'publisher', 'publisher-url':
        if name == 'comment' and 'comment.utf-8' in torrent:
            data = torrent['comment.utf-8']
            meta.append(('.META/' + name, len(data)))
        elif name in torrent:
            if name == 'announce-list':
                data = decode_announce_list(torrent[name])
            elif name == 'codepage':
                data = str(torrent[name])
            elif name == 'creation date':
                dt = torrent[name]
                data = decode_datetime_asc(dt)
                dt = decode_datetime(dt)
            elif name == 'nodes':
                data = ['%s:%s' % (host, port) for host, port in torrent[name]]
                data = '\n'.join(data)
            else:
                data = torrent[name]
            meta.append(('.META/' + name, len(data)))
 
    if 'private' in info:
        meta.append(('.META/private', 1))
 
    if 'piece length' in info:
        meta.append(('.META/piece length', len(str(info['piece length']))))
 
    paths += meta
    dirs = set()
    for name, size in paths:
        if '/' in name:
            dirs.add(dirname(name))
 
    if not dt:
        dt = decode_datetime(getmtime(sys.argv[2]))
 
    for name in sorted(dirs):
        output("dr-xr-xr-x 1 0 0 0 %s %s" % (dt, name))
 
    for name, size in sorted(paths):
        output("-r--r--r-- 1 0 0 %d %s %s" % (size, dt, name))
 
 
def mctorrent_copyout():
    """Extract a file from the VFS"""
 
    torrent_filename = sys.argv[3]
    real_filename = sys.argv[4]
    data = None
 
    for name in 'announce', 'announce-list', 'codepage', 'comment', \
                'created by', 'creation date', 'encoding', \
                'nodes', 'publisher', 'publisher-url':
        if name == 'comment' and 'comment.utf-8' in torrent:
            data = torrent['comment.utf-8']
        elif torrent_filename == '.META/' + name:
            if name in torrent:
                if name == 'announce-list':
                    data = decode_announce_list(torrent[name])
                elif name == 'codepage':
                    data = str(torrent[name])
                elif name == 'creation date':
                    data = decode_datetime_asc(torrent[name])
                elif name == 'nodes':
                    data = ['%s:%s' % (host, port)
                            for host, port in torrent[name]]
                    data = '\n'.join(data)
                else:
                    data = str(torrent[name])
            else:
                torrent_error('Unknown ' + name)
            break
 
    if torrent_filename in ('.META/private', '.META/piece length'):
        info = torrent['info']
        if torrent_filename == '.META/private':
            if 'private' not in info:
                torrent_error('Private absent')
        if torrent_filename == '.META/piece length':
            if 'piece length' not in info:
                torrent_error('Piece length absent')
        data = str(info[torrent_filename[len('.META/'):]])
 
    if not torrent_filename.startswith('.META/'):
        data = ''
 
    if data is None:
        torrent_error('Unknown file name')
    else:
        outfile = open(real_filename, 'wt')
        outfile.write(data)
        outfile.close()
 
 
def mctorrent_copyin():
    """Put a file to the VFS"""
    sys.exit("Torrent VFS doesn't support adding/overwriting files "
             "(read-only filesystem)")
 
 
def mctorrent_rm():
    """Remove a file from the VFS"""
    sys.exit("Torrent VFS doesn't support removing files/directories "
             "(read-only filesystem)")
 
 
mctorrent_rmdir = mctorrent_rm
 
 
def mctorrent_mkdir():
    """Create a directory in the VFS"""
    sys.exit("Torrent VFS doesn't support creating directories "
             "(read-only filesystem)")
 
 
def torrent_error(error_str):
    logger.critical("Error parsing the torrent metafile: %s", error_str)
    sys.exit(1)
 
 
def decode_dict(d, encoding):
    new_d = {}
    for k in d:
        v = d[k]
        k = k.decode(encoding)
        if isinstance(v, dict):
            v = decode_dict(v, encoding)
        elif isinstance(v, list):
            v = decode_list(v, encoding)
        elif isinstance(v, bytes):
            v = v.decode(encoding)
        new_d[k] = v
    return new_d
 
 
def decode_list(l, encoding):
    new_l = []
    for v in l:
        if isinstance(v, dict):
            v = decode_dict(v, encoding)
        elif isinstance(v, list):
            v = decode_list(v, encoding)
        elif isinstance(v, bytes):
            v = v.decode(encoding)
        new_l.append(v)
    return new_l
 
 
def decode_torrent():
    helper = os.environ.get('MC_TEST_EXTFS_LIST_CMD')
    try:
        if helper is not None:
            data = subprocess.check_output(helper.split())
        else:
            torrent_file = open(sys.argv[2], 'rb')
            data = torrent_file.read()
            torrent_file.close()
        torrent = eff_bdecode(data)
    except IOError as error_str:
        torrent_error(error_str)
 
    del torrent[b'info'][b'pieces']
    if b'info' not in torrent:
        torrent_error('Info absent')
 
    if PY3:
        codepage = torrent.get(b'codepage', None)
        encoding = torrent.get(b'encoding', None)
        if encoding:
            encoding = encoding.decode('ascii')
        elif codepage:
            encoding = codepage.decode('ascii')
        else:
            for encoding in ('ascii', 'utf-8', default_encoding):
                try:
                    return decode_dict(torrent, encoding)
                except UnicodeDecodeError:
                    pass
            torrent_error('UnicodeDecodeError')
        return decode_dict(torrent, encoding)
 
    return torrent
 
 
def decode_datetime_asc(dt):
    try:
        return asctime(localtime(float(dt)))
    except ValueError:
        return datetime.max.ctime()
 
 
def decode_datetime(dt):
    try:
        Y, m, d, H, M = localtime(float(dt))[0:5]
    except ValueError:
        return datetime.max.ctime()
    if Y > 9999:
        Y = 9999
    return "%02d-%02d-%d %02d:%02d" % (m, d, Y, H, M)
 
 
def decode_announce_list(announce):
    return '\n'.join(a[0] for a in announce if a)
 
 
command = sys.argv[1]
procname = "mctorrent_" + command
 
g = globals()
if procname not in g:
    logger.critical("Unknown command %s", command)
    sys.exit(1)
 
torrent = decode_torrent()
 
try:
    g[procname]()
except SystemExit:
    raise
except Exception:
    logger.exception("Error during run")

/* [previous][next][first][last][top][bottom][index][help]  */