[prev in list] [next in list] [prev in thread] [next in thread] 

List:       cifs-protocol
Subject:    [cifs-protocol] MS-XCA: RTL compression is available via Python on Windows
From:       Douglas Bagnall via cifs-protocol <cifs-protocol () lists ! samba ! org>
Date:       2022-12-27 23:55:43
Message-ID: 3ffdd82f-60af-0c89-0bb4-53c1625e4dc0 () catalyst ! net ! nz
[Download RAW message or body]

This is *not* a dochelp question, but a follow-up to a couple of unresolved 
issues in recent MS-XCA threads that might be of interest to cifs-protocol and 
perhaps Jeff and Obaid who answered the questions.

In https://lists.samba.org/archive/cifs-protocol/2022-October/003821.html 
("[MS-XCA] is LZ77 + Huffman the same as the Win32 compression API? - 
TrackingID#2210190040006868"), I wrote of the ntifs-rtl functions:

> I haven't been able to compile and link a test utility using this API, which 
> seems to be available in kernel mode only.
> 
> I think it is possible, but you need to compile the thing as a driver, rather 
> than a plain executable.
> 
> Some of the trouble is no doubt that I have been muddling around with emacs and 
> gcc in various kinds of Cygwin, MinGW, MSYS2, and WSL, rather than committing to 
> learning Visual Studio and its ecosystem.

Well, it turns out you can access them through Python on Windows, using the 
standard ctypes module. As attached at the end. I didn't need to compile 
anything, let alone learn VS.

The idea came from 
https://github.com/coderforlife/ms-compress/blob/master/test/compressors.py.


Secondly, once I had access to RTL via Python, I looked again at a file that I 
had constructed that didn't use LZ-77 matches, and which wouldn't decompress 
using the compress.h API. That's the thread ending at 
https://lists.samba.org/archive/cifs-protocol/2022-November/003885.html 
("[MS-XCA] LZ77+ Huffman: questions about blocks - 
TrackingID#2211140040009096"), where Jeff says:

> Please reach out to us at our DocHelp alias if you want to give us files that won't \
> decompress, then we can trace through and tell you why. 

But with the Python RTL bindings, the file *does* decompress, so that bug is out 
of scope for us all.

What follows is the Python code mentioned above.

cheers,
Douglas


-------------8<---------------------------------------------
# Generate test vectors for Windows LZ77 Huffman compression.
#
# Copyright (c) 2022 Catalyst IT
#
# GPLv3+.
#
# This uses the Python ctypes module to access the lower level RTL
# compression functions.

import sys
import argparse
from ctypes import create_string_buffer, byref, windll
from ctypes.wintypes import USHORT, ULONG, LONG, PULONG, LPVOID, CHAR
NTSTATUS = LONG


METHODS = {
     'LZNT1': 2,
     'XPRESS_PLAIN': 3,
     'XPRESS_HUFF': 4,
     '2': 2,
     '3': 3,
     '4': 4
}


class RtlError(Exception):
     pass


def ntstatus_check(status, f, args):
     # 0x117 is STATUS_BUFFER_ALL_ZEROS
     status &= 0xffffffff
     if status in (0, 0x117):
         return status
     msg = {
         0xC0000023: "buffer too small",
         0xC0000242: "bad compression data",
     }.get(status, '')

     raise RtlError(f'NTSTATUS: {status:08X} {msg}')


def wrap(f, result, *args):
     f.restype = result
     f.argtypes = args
     f.errcheck = ntstatus_check
     return f


CompressBuffer = wrap(windll.ntdll.RtlCompressBuffer, NTSTATUS,
                       USHORT, LPVOID, ULONG, LPVOID, ULONG, ULONG, PULONG,
                       LPVOID)


GetCompressionWorkSpaceSize = wrap(windll.ntdll.RtlGetCompressionWorkSpaceSize,
                                    NTSTATUS,
                                    USHORT, PULONG, PULONG)


DecompressBufferEx = wrap(windll.ntdll.RtlDecompressBufferEx,
                           NTSTATUS,
                           USHORT, LPVOID, ULONG, LPVOID, ULONG, PULONG, LPVOID)


def compress(data, format, effort=0):
     flags = USHORT(format | effort)
     workspace_size = ULONG(0)
     fragment_size = ULONG(0)
     comp_len = ULONG(0)
     GetCompressionWorkSpaceSize(flags,
                                 byref(workspace_size),
                                 byref(fragment_size))
     workspace = create_string_buffer(workspace_size.value)
     output_len = len(data) * 9 // 8 + 260
     output_buf = bytearray(output_len)
     CompressBuffer(flags,
                    (CHAR * 1).from_buffer(data), len(data),
                    (CHAR * 1).from_buffer(output_buf), output_len,
                    4096,
                    byref(comp_len),
                    workspace)
     return output_buf[:comp_len.value]


def decompress(data, format, target_size=None):
     flags = USHORT(format)
     workspace_size = ULONG(0)
     fragment_size = ULONG(0)
     decomp_len = ULONG(0)
     GetCompressionWorkSpaceSize(flags,
                                 byref(workspace_size),
                                 byref(fragment_size))
     workspace = create_string_buffer(workspace_size.value)
     if target_size is None:
         output_len = len(data) * 10
     else:
         output_len = target_size
     output_buf = bytearray(output_len)

     DecompressBufferEx(format,
                        (CHAR * 1).from_buffer(output_buf), len(output_buf),
                        (CHAR * 1).from_buffer(data), len(data),
                        byref(decomp_len),
                        workspace)
     return output_buf[:decomp_len.value]


def main():
     if sys.getwindowsversion().major < 7:
         print("this probably won't work on your very old version of Windows\n"
               "but we'll try anyway!", file=sys.stderr)

     parser = argparse.ArgumentParser()
     parser.add_argument('-d', '--decompress', action='store_true',
                         help='decompress instead of compress')
     parser.add_argument('-m', '--method', default='XPRESS_HUFF',
                         choices=list(METHODS.keys()),
                         help='use this compression method')
     parser.add_argument('-e', '--extra-effort', action='store_true',
                         help='use extra effort to compress')

     parser.add_argument('-s', '--decompressed-size', type=int,
                         help=('decompress to this size '
                               '(required for XPRESS_HUFF'))

     parser.add_argument('-o', '--output',
                         help='write to this file')
     parser.add_argument('-i', '--input',
                         help='read data from this file')

     args = parser.parse_args()

     method = METHODS[args.method]

     if all((args.decompress,
             args.decompressed_size is None,
             method == 4)):
         print("a size is required for XPRESS_HUFF decompression")
         sys.exit(1)

     with open(args.input, 'rb') as f:
         data = bytearray(f.read())

     if args.decompress:
         output = decompress(data, method, args.decompressed_size)
     else:
         effort = 1 if args.extra_effort else 0
         output = compress(data, method, effort)

     with open(args.output, 'wb') as f:
         f.write(output)


main()

_______________________________________________
cifs-protocol mailing list
cifs-protocol@lists.samba.org
https://lists.samba.org/mailman/listinfo/cifs-protocol


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic